diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/package-info.java
new file mode 100644
index 0000000000..b5bcf18eda
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Defines a {@link com.google.cloud.dataflow.sdk.coders.Coder}
+ * for Protocol Buffers messages, {@code ProtoCoder}.
+ *
+ * @see com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder
+ */
+package com.google.cloud.dataflow.sdk.coders.protobuf;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
index 7ecccf14a6..7d59b09c8d 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
@@ -72,7 +72,7 @@
*
Reading from Cloud Bigtable
*
* The Bigtable source returns a set of rows from a single table, returning a
- * {@code PCollection<Row>}.
+ * {@code PCollection}.
*
* To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
* or builder configured with the project and other information necessary to identify the
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
new file mode 100644
index 0000000000..112a954d71
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Defines transforms for reading and writing from Google Cloud Bigtable.
+ *
+ * @see com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO
+ */
+package com.google.cloud.dataflow.sdk.io.bigtable;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index d0cc4e53d5..0feae957f8 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -952,6 +952,9 @@ private void groupByKeyHelper(
context.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
+ context.addInput(
+ PropertyNames.IS_MERGING_WINDOW_FN,
+ !windowingStrategy.getWindowFn().isNonMerging());
}
});
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
index 1c0279897a..2a164c3518 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
@@ -18,7 +18,6 @@
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
import com.google.cloud.dataflow.sdk.io.Source.Reader;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
new file mode 100644
index 0000000000..2792631560
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+
+/**
+ * A callback for completing a bundle of input.
+ */
+interface CompletionCallback {
+ /**
+ * Handle a successful result.
+ */
+ void handleResult(CommittedBundle> inputBundle, InProcessTransformResult result);
+
+ /**
+ * Handle a result that terminated abnormally due to the provided {@link Throwable}.
+ */
+ void handleThrowable(CommittedBundle> inputBundle, Throwable t);
+}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
index 745f8f2718..307bc5cdb5 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
@@ -15,7 +15,6 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import java.util.Objects;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
new file mode 100644
index 0000000000..ae686f2979
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -0,0 +1,394 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
+import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nullable;
+
+/**
+ * An {@link InProcessExecutor} that uses an underlying {@link ExecutorService} and
+ * {@link InProcessEvaluationContext} to execute a {@link Pipeline}.
+ */
+final class ExecutorServiceParallelExecutor implements InProcessExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
+
+ private final ExecutorService executorService;
+
+ private final Map>> valueToConsumers;
+ private final Set keyedPValues;
+ private final TransformEvaluatorRegistry registry;
+ private final InProcessEvaluationContext evaluationContext;
+
+ private final ConcurrentMap currentEvaluations;
+ private final ConcurrentMap, Boolean> scheduledExecutors;
+
+ private final Queue allUpdates;
+ private final BlockingQueue visibleUpdates;
+
+ private final TransformExecutorService parallelExecutorService;
+ private final CompletionCallback defaultCompletionCallback;
+
+ private Collection> rootNodes;
+
+ public static ExecutorServiceParallelExecutor create(
+ ExecutorService executorService,
+ Map>> valueToConsumers,
+ Set keyedPValues,
+ TransformEvaluatorRegistry registry,
+ InProcessEvaluationContext context) {
+ return new ExecutorServiceParallelExecutor(
+ executorService, valueToConsumers, keyedPValues, registry, context);
+ }
+
+ private ExecutorServiceParallelExecutor(
+ ExecutorService executorService,
+ Map>> valueToConsumers,
+ Set keyedPValues,
+ TransformEvaluatorRegistry registry,
+ InProcessEvaluationContext context) {
+ this.executorService = executorService;
+ this.valueToConsumers = valueToConsumers;
+ this.keyedPValues = keyedPValues;
+ this.registry = registry;
+ this.evaluationContext = context;
+
+ currentEvaluations = new ConcurrentHashMap<>();
+ scheduledExecutors = new ConcurrentHashMap<>();
+
+ this.allUpdates = new ConcurrentLinkedQueue<>();
+ this.visibleUpdates = new ArrayBlockingQueue<>(20);
+
+ parallelExecutorService =
+ TransformExecutorServices.parallel(executorService, scheduledExecutors);
+ defaultCompletionCallback = new DefaultCompletionCallback();
+ }
+
+ @Override
+ public void start(Collection> roots) {
+ rootNodes = ImmutableList.copyOf(roots);
+ Runnable monitorRunnable = new MonitorRunnable();
+ executorService.submit(monitorRunnable);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void scheduleConsumption(
+ AppliedPTransform, ?, ?> consumer,
+ @Nullable CommittedBundle> bundle,
+ CompletionCallback onComplete) {
+ evaluateBundle(consumer, bundle, onComplete);
+ }
+
+ private void evaluateBundle(
+ final AppliedPTransform, ?, ?> transform,
+ @Nullable final CommittedBundle bundle,
+ final CompletionCallback onComplete) {
+ TransformExecutorService transformExecutor;
+ if (isKeyed(bundle.getPCollection())) {
+ final StepAndKey stepAndKey =
+ StepAndKey.of(transform, bundle == null ? null : bundle.getKey());
+ transformExecutor = getSerialExecutorService(stepAndKey);
+ } else {
+ transformExecutor = parallelExecutorService;
+ }
+ TransformExecutor callable =
+ TransformExecutor.create(
+ registry, evaluationContext, bundle, transform, onComplete, transformExecutor);
+ transformExecutor.schedule(callable);
+ }
+
+ private boolean isKeyed(PValue pvalue) {
+ return keyedPValues.contains(pvalue);
+ }
+
+ private void scheduleConsumers(CommittedBundle> bundle) {
+ for (AppliedPTransform, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) {
+ scheduleConsumption(consumer, bundle, defaultCompletionCallback);
+ }
+ }
+
+ private TransformExecutorService getSerialExecutorService(StepAndKey stepAndKey) {
+ if (!currentEvaluations.containsKey(stepAndKey)) {
+ currentEvaluations.putIfAbsent(
+ stepAndKey, TransformExecutorServices.serial(executorService, scheduledExecutors));
+ }
+ return currentEvaluations.get(stepAndKey);
+ }
+
+ @Override
+ public void awaitCompletion() throws Throwable {
+ VisibleExecutorUpdate update;
+ do {
+ update = visibleUpdates.take();
+ if (update.throwable.isPresent()) {
+ throw update.throwable.get();
+ }
+ } while (!update.isDone());
+ executorService.shutdown();
+ }
+
+ /**
+ * The default {@link CompletionCallback}. The default completion callback is used to complete
+ * transform evaluations that are triggered due to the arrival of elements from an upstream
+ * transform, or for a source transform.
+ */
+ private class DefaultCompletionCallback implements CompletionCallback {
+ @Override
+ public void handleResult(CommittedBundle> inputBundle, InProcessTransformResult result) {
+ Iterable extends CommittedBundle>> resultBundles =
+ evaluationContext.handleResult(inputBundle, Collections.emptyList(), result);
+ for (CommittedBundle> outputBundle : resultBundles) {
+ allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
+ }
+ }
+
+ @Override
+ public void handleThrowable(CommittedBundle> inputBundle, Throwable t) {
+ allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+ }
+ }
+
+ /**
+ * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection
+ * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the
+ * timers used to create the input to the {@link InProcessEvaluationContext evaluation context}
+ * as part of the result.
+ */
+ private class TimerCompletionCallback implements CompletionCallback {
+ private final Iterable timers;
+
+ private TimerCompletionCallback(Iterable timers) {
+ this.timers = timers;
+ }
+
+ @Override
+ public void handleResult(CommittedBundle> inputBundle, InProcessTransformResult result) {
+ Iterable extends CommittedBundle>> resultBundles =
+ evaluationContext.handleResult(inputBundle, timers, result);
+ for (CommittedBundle> outputBundle : resultBundles) {
+ allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
+ }
+ }
+
+ @Override
+ public void handleThrowable(CommittedBundle> inputBundle, Throwable t) {
+ allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+ }
+ }
+
+ /**
+ * An internal status update on the state of the executor.
+ *
+ * Used to signal when the executor should be shut down (due to an exception).
+ */
+ private static class ExecutorUpdate {
+ private final Optional extends CommittedBundle>> bundle;
+ private final Optional extends Throwable> throwable;
+
+ public static ExecutorUpdate fromBundle(CommittedBundle> bundle) {
+ return new ExecutorUpdate(bundle, null);
+ }
+
+ public static ExecutorUpdate fromThrowable(Throwable t) {
+ return new ExecutorUpdate(null, t);
+ }
+
+ private ExecutorUpdate(CommittedBundle> producedBundle, Throwable throwable) {
+ this.bundle = Optional.fromNullable(producedBundle);
+ this.throwable = Optional.fromNullable(throwable);
+ }
+
+ public Optional extends CommittedBundle>> getBundle() {
+ return bundle;
+ }
+
+ public Optional extends Throwable> getException() {
+ return throwable;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(ExecutorUpdate.class)
+ .add("bundle", bundle)
+ .add("exception", throwable)
+ .toString();
+ }
+ }
+
+ /**
+ * An update of interest to the user. Used in {@link #awaitCompletion} to decide whether to
+ * return normally or throw an exception.
+ */
+ private static class VisibleExecutorUpdate {
+ private final Optional extends Throwable> throwable;
+ private final boolean done;
+
+ public static VisibleExecutorUpdate fromThrowable(Throwable e) {
+ return new VisibleExecutorUpdate(false, e);
+ }
+
+ public static VisibleExecutorUpdate finished() {
+ return new VisibleExecutorUpdate(true, null);
+ }
+
+ private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) {
+ this.throwable = Optional.fromNullable(exception);
+ this.done = done;
+ }
+
+ public boolean isDone() {
+ return done;
+ }
+ }
+
+ private class MonitorRunnable implements Runnable {
+ private final String runnableName =
+ String.format(
+ "%s$%s-monitor",
+ evaluationContext.getPipelineOptions().getAppName(),
+ ExecutorServiceParallelExecutor.class.getSimpleName());
+
+ @Override
+ public void run() {
+ String oldName = Thread.currentThread().getName();
+ Thread.currentThread().setName(runnableName);
+ try {
+ ExecutorUpdate update = allUpdates.poll();
+ if (update != null) {
+ LOG.debug("Executor Update: {}", update);
+ if (update.getBundle().isPresent()) {
+ scheduleConsumers(update.getBundle().get());
+ } else if (update.getException().isPresent()) {
+ visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
+ }
+ }
+ fireTimers();
+ mightNeedMoreWork();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Monitor died due to being interrupted");
+ while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) {
+ visibleUpdates.poll();
+ }
+ } catch (Throwable t) {
+ LOG.error("Monitor thread died due to throwable", t);
+ while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) {
+ visibleUpdates.poll();
+ }
+ } finally {
+ if (!shouldShutdown()) {
+ // The monitor thread should always be scheduled; but we only need to be scheduled once
+ executorService.submit(this);
+ }
+ Thread.currentThread().setName(oldName);
+ }
+ }
+
+ private void fireTimers() throws Exception {
+ try {
+ for (Map.Entry, Map> transformTimers :
+ evaluationContext.extractFiredTimers().entrySet()) {
+ AppliedPTransform, ?, ?> transform = transformTimers.getKey();
+ for (Map.Entry keyTimers : transformTimers.getValue().entrySet()) {
+ for (TimeDomain domain : TimeDomain.values()) {
+ Collection delivery = keyTimers.getValue().getTimers(domain);
+ if (delivery.isEmpty()) {
+ continue;
+ }
+ KeyedWorkItem work =
+ KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CommittedBundle> bundle =
+ InProcessBundle.>keyed(
+ (PCollection) transform.getInput(), keyTimers.getKey())
+ .add(WindowedValue.valueInEmptyWindows(work))
+ .commit(Instant.now());
+ scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Internal Error while delivering timers", e);
+ throw e;
+ }
+ }
+
+ private boolean shouldShutdown() {
+ if (evaluationContext.isDone()) {
+ LOG.debug("Pipeline is finished. Shutting down. {}");
+ while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
+ visibleUpdates.poll();
+ }
+ executorService.shutdown();
+ return true;
+ }
+ return false;
+ }
+
+ private void mightNeedMoreWork() {
+ synchronized (scheduledExecutors) {
+ for (TransformExecutor> executor : scheduledExecutors.keySet()) {
+ Thread thread = executor.getThread();
+ if (thread != null) {
+ switch (thread.getState()) {
+ case BLOCKED:
+ case WAITING:
+ case TERMINATED:
+ case TIMED_WAITING:
+ break;
+ default:
+ return;
+ }
+ }
+ }
+ }
+ // All current TransformExecutors are blocked; add more work from the roots.
+ for (AppliedPTransform, ?, ?> root : rootNodes) {
+ scheduleConsumption(root, null, defaultCompletionCallback);
+ }
+ }
+ }
+}
+
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
index 14428888e2..bde1df45e9 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
@@ -16,7 +16,6 @@
package com.google.cloud.dataflow.sdk.runners.inprocess;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.Flatten;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index 0347281749..ec63be84c9 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -22,7 +22,6 @@
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Builder;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
index e280e22d2b..094526d962 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
@@ -866,7 +866,7 @@ private void updatePending(
* {@link #getWatermarks(AppliedPTransform)}, the output watermark will be equal to
* {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
*/
- public boolean isDone() {
+ public boolean allWatermarksAtPositiveInfinity() {
for (Map.Entry, TransformWatermarks> watermarksEntry :
transformToWatermarks.entrySet()) {
Instant endOfTime = THE_END_OF_TIME.get();
@@ -1209,8 +1209,11 @@ public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
* and deletedTimers.
*/
public TimerUpdate build() {
- return new TimerUpdate(key, ImmutableSet.copyOf(completedTimers),
- ImmutableSet.copyOf(setTimers), ImmutableSet.copyOf(deletedTimers));
+ return new TimerUpdate(
+ key,
+ ImmutableSet.copyOf(completedTimers),
+ ImmutableSet.copyOf(setTimers),
+ ImmutableSet.copyOf(deletedTimers));
}
}
@@ -1245,6 +1248,13 @@ Iterable extends TimerData> getDeletedTimers() {
return deletedTimers;
}
+ /**
+ * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers.
+ */
+ public TimerUpdate withCompletedTimers(Iterable completedTimers) {
+ return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers);
+ }
+
@Override
public int hashCode() {
return Objects.hash(key, completedTimers, setTimers, deletedTimers);
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
index cc20161097..112ba17d14 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2015 Google Inc.
+ * Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -22,7 +22,6 @@
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.MoreObjects;
-import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
@@ -64,6 +63,11 @@ private InProcessBundle(PCollection pcollection, boolean keyed, Object key) {
this.elements = ImmutableList.builder();
}
+ @Override
+ public PCollection getPCollection() {
+ return pcollection;
+ }
+
@Override
public InProcessBundle add(WindowedValue element) {
checkState(!committed, "Can't add element %s to committed bundle %s", element, this);
@@ -105,12 +109,12 @@ public Instant getSynchronizedProcessingOutputWatermark() {
@Override
public String toString() {
- ToStringHelper toStringHelper =
- MoreObjects.toStringHelper(this).add("pcollection", pcollection);
- if (keyed) {
- toStringHelper = toStringHelper.add("key", key);
- }
- return toStringHelper.add("elements", elements).toString();
+ return MoreObjects.toStringHelper(this)
+ .omitNullValues()
+ .add("pcollection", pcollection)
+ .add("key", key)
+ .add("elements", committedElements)
+ .toString();
}
};
}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
new file mode 100644
index 0000000000..2908fba818
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
@@ -0,0 +1,383 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
+import com.google.cloud.dataflow.sdk.util.ExecutionContext;
+import com.google.cloud.dataflow.sdk.util.SideInputReader;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.util.common.CounterSet;
+import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * The evaluation context for a specific pipeline being executed by the
+ * {@link InProcessPipelineRunner}. Contains state shared within the execution across all
+ * transforms.
+ *
+ * {@link InProcessEvaluationContext} contains shared state for an execution of the
+ * {@link InProcessPipelineRunner} that can be used while evaluating a {@link PTransform}. This
+ * consists of views into underlying state and watermark implementations, access to read and write
+ * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and
+ * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
+ * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and
+ * known to be empty).
+ *
+ *
{@link InProcessEvaluationContext} also handles results by committing finalizing bundles based
+ * on the current global state and updating the global state appropriately. This includes updating
+ * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that
+ * can be executed.
+ */
+class InProcessEvaluationContext {
+ /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */
+ private final Map, String> stepNames;
+
+ /** The options that were used to create this {@link Pipeline}. */
+ private final InProcessPipelineOptions options;
+
+ /** The current processing time and event time watermarks and timers. */
+ private final InMemoryWatermarkManager watermarkManager;
+
+ /** Executes callbacks based on the progression of the watermark. */
+ private final WatermarkCallbackExecutor callbackExecutor;
+
+ /** The stateInternals of the world, by applied PTransform and key. */
+ private final ConcurrentMap>
+ applicationStateInternals;
+
+ private final InProcessSideInputContainer sideInputContainer;
+
+ private final CounterSet mergedCounters;
+
+ public static InProcessEvaluationContext create(
+ InProcessPipelineOptions options,
+ Collection> rootTransforms,
+ Map>> valueToConsumers,
+ Map, String> stepNames,
+ Collection> views) {
+ return new InProcessEvaluationContext(
+ options, rootTransforms, valueToConsumers, stepNames, views);
+ }
+
+ private InProcessEvaluationContext(
+ InProcessPipelineOptions options,
+ Collection> rootTransforms,
+ Map>> valueToConsumers,
+ Map, String> stepNames,
+ Collection> views) {
+ this.options = checkNotNull(options);
+ checkNotNull(rootTransforms);
+ checkNotNull(valueToConsumers);
+ checkNotNull(stepNames);
+ checkNotNull(views);
+ this.stepNames = stepNames;
+
+ this.watermarkManager =
+ InMemoryWatermarkManager.create(
+ NanosOffsetClock.create(), rootTransforms, valueToConsumers);
+ this.sideInputContainer = InProcessSideInputContainer.create(this, views);
+
+ this.applicationStateInternals = new ConcurrentHashMap<>();
+ this.mergedCounters = new CounterSet();
+
+ this.callbackExecutor = WatermarkCallbackExecutor.create();
+ }
+
+ /**
+ * Handle the provided {@link InProcessTransformResult}, produced after evaluating the provided
+ * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}).
+ *
+ * The result is the output of running the transform contained in the
+ * {@link InProcessTransformResult} on the contents of the provided bundle.
+ *
+ * @param completedBundle the bundle that was processed to produce the result. Potentially
+ * {@code null} if the transform that produced the result is a root
+ * transform
+ * @param completedTimers the timers that were delivered to produce the {@code completedBundle},
+ * or an empty iterable if no timers were delivered
+ * @param result the result of evaluating the input bundle
+ * @return the committed bundles contained within the handled {@code result}
+ */
+ public synchronized Iterable extends CommittedBundle>> handleResult(
+ @Nullable CommittedBundle> completedBundle,
+ Iterable completedTimers,
+ InProcessTransformResult result) {
+ Iterable extends CommittedBundle>> committedBundles =
+ commitBundles(result.getOutputBundles());
+ // Update watermarks and timers
+ watermarkManager.updateWatermarks(
+ completedBundle,
+ result.getTransform(),
+ result.getTimerUpdate().withCompletedTimers(completedTimers),
+ committedBundles,
+ result.getWatermarkHold());
+ fireAllAvailableCallbacks();
+ // Update counters
+ if (result.getCounters() != null) {
+ mergedCounters.merge(result.getCounters());
+ }
+ // Update state internals
+ CopyOnAccessInMemoryStateInternals> theirState = result.getState();
+ if (theirState != null) {
+ CopyOnAccessInMemoryStateInternals> committedState = theirState.commit();
+ StepAndKey stepAndKey =
+ StepAndKey.of(
+ result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
+ if (!committedState.isEmpty()) {
+ applicationStateInternals.put(stepAndKey, committedState);
+ } else {
+ applicationStateInternals.remove(stepAndKey);
+ }
+ }
+ return committedBundles;
+ }
+
+ private Iterable extends CommittedBundle>> commitBundles(
+ Iterable extends UncommittedBundle>> bundles) {
+ ImmutableList.Builder> completed = ImmutableList.builder();
+ for (UncommittedBundle> inProgress : bundles) {
+ AppliedPTransform, ?, ?> producing =
+ inProgress.getPCollection().getProducingTransformInternal();
+ TransformWatermarks watermarks = watermarkManager.getWatermarks(producing);
+ CommittedBundle> committed =
+ inProgress.commit(watermarks.getSynchronizedProcessingOutputTime());
+ // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so
+ // filter them out
+ if (!Iterables.isEmpty(committed.getElements())) {
+ completed.add(committed);
+ }
+ }
+ return completed.build();
+ }
+
+ private void fireAllAvailableCallbacks() {
+ for (AppliedPTransform, ?, ?> transform : stepNames.keySet()) {
+ fireAvailableCallbacks(transform);
+ }
+ }
+
+ private void fireAvailableCallbacks(AppliedPTransform, ?, ?> producingTransform) {
+ TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform);
+ callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark());
+ }
+
+ /**
+ * Create a {@link UncommittedBundle} for use by a source.
+ */
+ public UncommittedBundle createRootBundle(PCollection output) {
+ return InProcessBundle.unkeyed(output);
+ }
+
+ /**
+ * Create a {@link UncommittedBundle} whose elements belong to the specified {@link
+ * PCollection}.
+ */
+ public UncommittedBundle createBundle(CommittedBundle> input, PCollection output) {
+ return input.isKeyed()
+ ? InProcessBundle.keyed(output, input.getKey())
+ : InProcessBundle.unkeyed(output);
+ }
+
+ /**
+ * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
+ * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}.
+ */
+ public UncommittedBundle createKeyedBundle(
+ CommittedBundle> input, Object key, PCollection output) {
+ return InProcessBundle.keyed(output, key);
+ }
+
+ /**
+ * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided
+ * {@link PCollectionView}.
+ */
+ public PCollectionViewWriter createPCollectionViewWriter(
+ PCollection> input, final PCollectionView output) {
+ return new PCollectionViewWriter() {
+ @Override
+ public void add(Iterable> values) {
+ sideInputContainer.write(output, values);
+ }
+ };
+ }
+
+ /**
+ * Schedule a callback to be executed after output would be produced for the given window
+ * if there had been input.
+ *
+ * Output would be produced when the watermark for a {@link PValue} passes the point at
+ * which the trigger for the specified window (with the specified windowing strategy) must have
+ * fired from the perspective of that {@link PValue}, as specified by the value of
+ * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the
+ * {@link WindowingStrategy}. When the callback has fired, either values will have been produced
+ * for a key in that window, the window is empty, or all elements in the window are late. The
+ * callback will be executed regardless of whether values have been produced.
+ */
+ public void scheduleAfterOutputWouldBeProduced(
+ PValue value,
+ BoundedWindow window,
+ WindowingStrategy, ?> windowingStrategy,
+ Runnable runnable) {
+ AppliedPTransform, ?, ?> producing = getProducing(value);
+ callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);
+
+ fireAvailableCallbacks(lookupProducing(value));
+ }
+
+ private AppliedPTransform, ?, ?> getProducing(PValue value) {
+ if (value.getProducingTransformInternal() != null) {
+ return value.getProducingTransformInternal();
+ }
+ return lookupProducing(value);
+ }
+
+ private AppliedPTransform, ?, ?> lookupProducing(PValue value) {
+ for (AppliedPTransform, ?, ?> transform : stepNames.keySet()) {
+ if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) {
+ return transform;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get the options used by this {@link Pipeline}.
+ */
+ public InProcessPipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ /**
+ * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key.
+ */
+ public InProcessExecutionContext getExecutionContext(
+ AppliedPTransform, ?, ?> application, Object key) {
+ StepAndKey stepAndKey = StepAndKey.of(application, key);
+ return new InProcessExecutionContext(
+ options.getClock(),
+ key,
+ (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey),
+ watermarkManager.getWatermarks(application));
+ }
+
+ /**
+ * Get all of the steps used in this {@link Pipeline}.
+ */
+ public Collection> getSteps() {
+ return stepNames.keySet();
+ }
+
+ /**
+ * Get the Step Name for the provided application.
+ */
+ public String getStepName(AppliedPTransform, ?, ?> application) {
+ return stepNames.get(application);
+ }
+
+ /**
+ * Returns a {@link SideInputReader} capable of reading the provided
+ * {@link PCollectionView PCollectionViews}.
+ * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to
+ * read
+ * @return a {@link SideInputReader} that can read all of the provided
+ * {@link PCollectionView PCollectionViews}
+ */
+ public SideInputReader createSideInputReader(final List> sideInputs) {
+ return sideInputContainer.createReaderForViews(sideInputs);
+ }
+
+ /**
+ * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
+ * of all other {@link CounterSet CounterSets} created by this call.
+ *
+ * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in
+ * all created {@link CounterSet CounterSets} when the transforms that call this method
+ * complete.
+ */
+ public CounterSet createCounterSet() {
+ return new CounterSet();
+ }
+
+ /**
+ * Returns all of the counters that have been merged into this context via calls to
+ * {@link CounterSet#merge(CounterSet)}.
+ */
+ public CounterSet getCounters() {
+ return mergedCounters;
+ }
+
+ /**
+ * Extracts all timers that have been fired and have not already been extracted.
+ *
+ * This is a destructive operation. Timers will only appear in the result of this method once
+ * for each time they are set.
+ */
+ public Map, Map> extractFiredTimers() {
+ return watermarkManager.extractFiredTimers();
+ }
+
+ /**
+ * Returns true if all steps are done.
+ */
+ public boolean isDone() {
+ if (!options.isShutdownUnboundedProducersWithMaxWatermark() && containsUnboundedPCollection()) {
+ return false;
+ }
+ if (!watermarkManager.allWatermarksAtPositiveInfinity()) {
+ return false;
+ }
+ return true;
+ }
+
+ private boolean containsUnboundedPCollection() {
+ for (AppliedPTransform, ?, ?> transform : stepNames.keySet()) {
+ for (PValue value : transform.getInput().expand()) {
+ if (value instanceof PCollection
+ && ((PCollection>) value).isBounded().equals(IsBounded.UNBOUNDED)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java
new file mode 100644
index 0000000000..7b60bca17d
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+
+import java.util.Collection;
+
+/**
+ * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
+ * source and intermediate {@link PTransform PTransforms}.
+ */
+interface InProcessExecutor {
+ /**
+ * Starts this executor. The provided collection is the collection of root transforms to
+ * initially schedule.
+ *
+ * @param rootTransforms
+ */
+ void start(Collection> rootTransforms);
+
+ /**
+ * Blocks until the job being executed enters a terminal state. A job is completed after all
+ * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
+ * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
+ *
+ * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
+ * waiting thread and rethrows it
+ */
+ void awaitCompletion() throws Throwable;
+}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
index d659d962f0..27e9a4be6e 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
@@ -15,10 +15,20 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;
+import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
/**
* Options that can be used to configure the {@link InProcessPipelineRunner}.
*/
-public interface InProcessPipelineOptions extends PipelineOptions {}
+public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions {
+ @Default.InstanceFactory(NanosOffsetClock.Factory.class)
+ Clock getClock();
+ void setClock(Clock clock);
+
+ boolean isShutdownUnboundedProducersWithMaxWatermark();
+
+ void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
+}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 124de46b94..32859dae63 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -17,31 +17,21 @@
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey;
import com.google.cloud.dataflow.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessCreatePCollectionView;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-import com.google.cloud.dataflow.sdk.util.ExecutionContext;
-import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -82,6 +72,11 @@ public class InProcessPipelineRunner {
* @param the type of elements that can be added to this bundle
*/
public static interface UncommittedBundle {
+ /**
+ * Returns the PCollection that the elements of this bundle belong to.
+ */
+ PCollection getPCollection();
+
/**
* Outputs an element to this bundle.
*
@@ -110,7 +105,7 @@ public static interface UncommittedBundle {
public static interface CommittedBundle {
/**
- * @return the PCollection that the elements of this bundle belong to
+ * Returns the PCollection that the elements of this bundle belong to.
*/
PCollection getPCollection();
@@ -154,107 +149,21 @@ public static interface PCollectionViewWriter {
void add(Iterable> values);
}
- /**
- * The evaluation context for the {@link InProcessPipelineRunner}. Contains state shared within
- * the current evaluation.
- */
- public static interface InProcessEvaluationContext {
- /**
- * Create a {@link UncommittedBundle} for use by a source.
- */
- UncommittedBundle createRootBundle(PCollection output);
-
- /**
- * Create a {@link UncommittedBundle} whose elements belong to the specified {@link
- * PCollection}.
- */
- UncommittedBundle createBundle(CommittedBundle> input, PCollection output);
-
- /**
- * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
- * {@link GroupByKeyOnly} {@link PTransform PTransforms}.
- */
- UncommittedBundle createKeyedBundle(
- CommittedBundle> input, Object key, PCollection output);
-
- /**
- * Create a bundle whose elements will be used in a PCollectionView.
- */
- PCollectionViewWriter createPCollectionViewWriter(
- PCollection> input, PCollectionView output);
-
- /**
- * Get the options used by this {@link Pipeline}.
- */
- InProcessPipelineOptions getPipelineOptions();
-
- /**
- * Get an {@link ExecutionContext} for the provided application.
- */
- InProcessExecutionContext getExecutionContext(
- AppliedPTransform, ?, ?> application, @Nullable Object key);
-
- /**
- * Get the Step Name for the provided application.
- */
- String getStepName(AppliedPTransform, ?, ?> application);
-
- /**
- * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to
- * read
- * @return a {@link SideInputReader} that can read all of the provided
- * {@link PCollectionView PCollectionViews}
- */
- SideInputReader createSideInputReader(List> sideInputs);
-
- /**
- * Schedules a callback after the watermark for a {@link PValue} after the trigger for the
- * specified window (with the specified windowing strategy) must have fired from the perspective
- * of that {@link PValue}, as specified by the value of
- * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the
- * {@link WindowingStrategy}.
- */
- void callAfterOutputMustHaveBeenProduced(PValue value, BoundedWindow window,
- WindowingStrategy, ?> windowingStrategy, Runnable runnable);
+ ////////////////////////////////////////////////////////////////////////////////////////////////
+ private final InProcessPipelineOptions options;
- /**
- * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
- * of all other {@link CounterSet CounterSets} created by this call.
- *
- * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in
- * all created {@link CounterSet CounterSets} when the transforms that call this method
- * complete.
- */
- CounterSet createCounterSet();
+ public static InProcessPipelineRunner fromOptions(PipelineOptions options) {
+ return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class));
+ }
- /**
- * Returns all of the counters that have been merged into this context via calls to
- * {@link CounterSet#merge(CounterSet)}.
- */
- CounterSet getCounters();
+ private InProcessPipelineRunner(InProcessPipelineOptions options) {
+ this.options = options;
}
/**
- * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
- * source and intermediate {@link PTransform PTransforms}.
+ * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}.
*/
- public static interface InProcessExecutor {
- /**
- * @param root the root {@link AppliedPTransform} to schedule
- */
- void scheduleRoot(AppliedPTransform, ?, ?> root);
-
- /**
- * @param consumer the {@link AppliedPTransform} to schedule
- * @param bundle the input bundle to the consumer
- */
- void scheduleConsumption(AppliedPTransform, ?, ?> consumer, CommittedBundle> bundle);
-
- /**
- * Blocks until the job being executed enters a terminal state. A job is completed after all
- * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
- * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
- */
- void awaitCompletion();
+ public InProcessPipelineOptions getPipelineOptions() {
+ return options;
}
}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java
index bf9a2e1c53..37c9fcfa65 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java
@@ -17,7 +17,6 @@
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow;
@@ -26,6 +25,7 @@
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -89,7 +89,7 @@ private InProcessSideInputContainer(InProcessEvaluationContext context,
* the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without
* casting, but will change as this {@link InProcessSideInputContainer} is modified.
*/
- public SideInputReader withViews(Collection> newContainedViews) {
+ public SideInputReader createReaderForViews(Collection> newContainedViews) {
if (!containedViews.containsAll(newContainedViews)) {
Set> currentlyContained = ImmutableSet.copyOf(containedViews);
Set> newRequested = ImmutableSet.copyOf(newContainedViews);
@@ -108,8 +108,20 @@ public SideInputReader withViews(Collection> newContainedView
*
* The provided iterable is expected to contain only a single window and pane.
*/
- public void write(PCollectionView> view, Iterable extends WindowedValue>> values)
- throws ExecutionException {
+ public void write(PCollectionView> view, Iterable extends WindowedValue>> values) {
+ Map>> valuesPerWindow =
+ indexValuesByWindow(values);
+ for (Map.Entry>> windowValues :
+ valuesPerWindow.entrySet()) {
+ updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
+ }
+ }
+
+ /**
+ * Index the provided values by all {@link BoundedWindow windows} in which they appear.
+ */
+ private Map>> indexValuesByWindow(
+ Iterable extends WindowedValue>> values) {
Map>> valuesPerWindow = new HashMap<>();
for (WindowedValue> value : values) {
for (BoundedWindow window : value.getWindows()) {
@@ -121,29 +133,40 @@ public void write(PCollectionView> view, Iterable extends WindowedValue>>
windowValues.add(value);
}
}
- for (Map.Entry>> windowValues :
- valuesPerWindow.entrySet()) {
- PCollectionViewWindow> windowedView = PCollectionViewWindow.of(view, windowValues.getKey());
- SettableFuture>> future = viewByWindows.get(windowedView);
+ return valuesPerWindow;
+ }
+
+ /**
+ * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the
+ * specified values, if the values are part of a later pane than currently exist within the
+ * {@link PCollectionViewWindow}.
+ */
+ private void updatePCollectionViewWindowValues(
+ PCollectionView> view, BoundedWindow window, Collection> windowValues) {
+ PCollectionViewWindow> windowedView = PCollectionViewWindow.of(view, window);
+ SettableFuture>> future = null;
+ try {
+ future = viewByWindows.get(windowedView);
if (future.isDone()) {
- try {
- Iterator extends WindowedValue>> existingValues = future.get().iterator();
- PaneInfo newPane = windowValues.getValue().iterator().next().getPane();
- // The current value may have no elements, if no elements were produced for the window,
- // but we are recieving late data.
- if (!existingValues.hasNext()
- || newPane.getIndex() > existingValues.next().getPane().getIndex()) {
- viewByWindows.invalidate(windowedView);
- viewByWindows.get(windowedView).set(windowValues.getValue());
- }
- } catch (InterruptedException e) {
- // TODO: Handle meaningfully. This should never really happen when the result remains
- // useful, but the result could be available and the thread can still be interrupted.
- Thread.currentThread().interrupt();
+ Iterator extends WindowedValue>> existingValues = future.get().iterator();
+ PaneInfo newPane = windowValues.iterator().next().getPane();
+ // The current value may have no elements, if no elements were produced for the window,
+ // but we are recieving late data.
+ if (!existingValues.hasNext()
+ || newPane.getIndex() > existingValues.next().getPane().getIndex()) {
+ viewByWindows.invalidate(windowedView);
+ viewByWindows.get(windowedView).set(windowValues);
}
} else {
- future.set(windowValues.getValue());
+ future.set(windowValues);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (future != null && !future.isDone()) {
+ future.set(Collections.>emptyList());
}
+ } catch (ExecutionException e) {
+ Throwables.propagate(e.getCause());
}
}
@@ -165,7 +188,7 @@ public T get(final PCollectionView view, final BoundedWindow window) {
viewByWindows.get(windowedView);
WindowingStrategy, ?> windowingStrategy = view.getWindowingStrategyInternal();
- evaluationContext.callAfterOutputMustHaveBeenProduced(
+ evaluationContext.scheduleAfterOutputWouldBeProduced(
view, window, windowingStrategy, new Runnable() {
@Override
public void run() {
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
index e3ae1a028c..24142c2151 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
@@ -17,7 +17,6 @@
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
index cd79c219bd..af5914bab0 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
@@ -17,7 +17,6 @@
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java
new file mode 100644
index 0000000000..15955724eb
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/**
+ * A (Step, Key) pair. This is useful as a map key or cache key for things that are available
+ * per-step in a keyed manner (e.g. State).
+ */
+final class StepAndKey {
+ private final AppliedPTransform, ?, ?> step;
+ private final Object key;
+
+ /**
+ * Create a new {@link StepAndKey} with the provided step and key.
+ */
+ public static StepAndKey of(AppliedPTransform, ?, ?> step, Object key) {
+ return new StepAndKey(step, key);
+ }
+
+ private StepAndKey(AppliedPTransform, ?, ?> step, Object key) {
+ this.step = step;
+ this.key = key;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(StepAndKey.class)
+ .add("step", step.getFullName())
+ .add("key", key)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(step, key);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ } else if (!(other instanceof StepAndKey)) {
+ return false;
+ } else {
+ StepAndKey that = (StepAndKey) other;
+ return Objects.equals(this.step, that.step)
+ && Objects.equals(this.key, that.key);
+ }
+ }
+}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java
index 3b672e0def..860ddfe48f 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java
@@ -16,7 +16,6 @@
package com.google.cloud.dataflow.sdk.runners.inprocess;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
new file mode 100644
index 0000000000..0c8cb7e80a
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Flatten.FlattenPCollectionList;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
+ * implementations based on the type of {@link PTransform} of the application.
+ */
+class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
+ public static TransformEvaluatorRegistry defaultRegistry() {
+ @SuppressWarnings("rawtypes")
+ ImmutableMap, TransformEvaluatorFactory> primitives =
+ ImmutableMap., TransformEvaluatorFactory>builder()
+ .put(Read.Bounded.class, new BoundedReadEvaluatorFactory())
+ .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory())
+ .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory())
+ .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory())
+ .put(
+ GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class,
+ new GroupByKeyEvaluatorFactory())
+ .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
+ .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
+ .build();
+ return new TransformEvaluatorRegistry(primitives);
+ }
+
+ // the TransformEvaluatorFactories can construct instances of all generic types of transform,
+ // so all instances of a primitive can be handled with the same evaluator factory.
+ @SuppressWarnings("rawtypes")
+ private final Map, TransformEvaluatorFactory> factories;
+
+ private TransformEvaluatorRegistry(
+ @SuppressWarnings("rawtypes")
+ Map, TransformEvaluatorFactory> factories) {
+ this.factories = factories;
+ }
+
+ @Override
+ public TransformEvaluator forApplication(
+ AppliedPTransform, ?, ?> application,
+ @Nullable CommittedBundle> inputBundle,
+ InProcessEvaluationContext evaluationContext)
+ throws Exception {
+ TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
+ return factory.forApplication(application, inputBundle, evaluationContext);
+ }
+}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
new file mode 100644
index 0000000000..d630749387
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.common.base.Throwables;
+
+import java.util.concurrent.Callable;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a
+ * {@link TransformEvaluatorFactory} and evaluating it on some bundle of input, and registering
+ * the result using a registered {@link CompletionCallback}.
+ *
+ * A {@link TransformExecutor} that is currently executing also provides access to the thread
+ * that it is being executed on.
+ */
+class TransformExecutor implements Callable {
+ public static TransformExecutor create(
+ TransformEvaluatorFactory factory,
+ InProcessEvaluationContext evaluationContext,
+ CommittedBundle inputBundle,
+ AppliedPTransform, ?, ?> transform,
+ CompletionCallback completionCallback,
+ TransformExecutorService transformEvaluationState) {
+ return new TransformExecutor<>(
+ factory,
+ evaluationContext,
+ inputBundle,
+ transform,
+ completionCallback,
+ transformEvaluationState);
+ }
+
+ private final TransformEvaluatorFactory evaluatorFactory;
+ private final InProcessEvaluationContext evaluationContext;
+
+ /** The transform that will be evaluated. */
+ private final AppliedPTransform, ?, ?> transform;
+ /** The inputs this {@link TransformExecutor} will deliver to the transform. */
+ private final CommittedBundle inputBundle;
+
+ private final CompletionCallback onComplete;
+ private final TransformExecutorService transformEvaluationState;
+
+ private Thread thread;
+
+ private TransformExecutor(
+ TransformEvaluatorFactory factory,
+ InProcessEvaluationContext evaluationContext,
+ CommittedBundle inputBundle,
+ AppliedPTransform, ?, ?> transform,
+ CompletionCallback completionCallback,
+ TransformExecutorService transformEvaluationState) {
+ this.evaluatorFactory = factory;
+ this.evaluationContext = evaluationContext;
+
+ this.inputBundle = inputBundle;
+ this.transform = transform;
+
+ this.onComplete = completionCallback;
+
+ this.transformEvaluationState = transformEvaluationState;
+ }
+
+ @Override
+ public InProcessTransformResult call() {
+ this.thread = Thread.currentThread();
+ try {
+ TransformEvaluator evaluator =
+ evaluatorFactory.forApplication(transform, inputBundle, evaluationContext);
+ if (inputBundle != null) {
+ for (WindowedValue value : inputBundle.getElements()) {
+ evaluator.processElement(value);
+ }
+ }
+ InProcessTransformResult result = evaluator.finishBundle();
+ onComplete.handleResult(inputBundle, result);
+ return result;
+ } catch (Throwable t) {
+ onComplete.handleThrowable(inputBundle, t);
+ throw Throwables.propagate(t);
+ } finally {
+ this.thread = null;
+ transformEvaluationState.complete(this);
+ }
+ }
+
+ /**
+ * If this {@link TransformExecutor} is currently executing, return the thread it is executing in.
+ * Otherwise, return null.
+ */
+ @Nullable
+ public Thread getThread() {
+ return this.thread;
+ }
+}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java
new file mode 100644
index 0000000000..3f00da6ebe
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+/**
+ * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as
+ * appropriate for the {@link StepAndKey} the executor exists for.
+ */
+interface TransformExecutorService {
+ /**
+ * Schedule the provided work to be eventually executed.
+ */
+ void schedule(TransformExecutor> work);
+
+ /**
+ * Finish executing the provided work. This may cause additional
+ * {@link TransformExecutor TransformExecutors} to be evaluated.
+ */
+ void complete(TransformExecutor> completed);
+}
+
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
new file mode 100644
index 0000000000..34efdf694e
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Static factory methods for constructing instances of {@link TransformExecutorService}.
+ */
+final class TransformExecutorServices {
+ private TransformExecutorServices() {
+ // Do not instantiate
+ }
+
+ /**
+ * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
+ * parallel.
+ */
+ public static TransformExecutorService parallel(
+ ExecutorService executor, Map, Boolean> scheduled) {
+ return new ParallelEvaluationState(executor, scheduled);
+ }
+
+ /**
+ * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
+ * serial.
+ */
+ public static TransformExecutorService serial(
+ ExecutorService executor, Map, Boolean> scheduled) {
+ return new SerialEvaluationState(executor, scheduled);
+ }
+
+ /**
+ * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor}
+ * scheduled will be immediately submitted to the {@link ExecutorService}.
+ *
+ * A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are
+ * processed in parallel.
+ */
+ private static class ParallelEvaluationState implements TransformExecutorService {
+ private final ExecutorService executor;
+ private final Map, Boolean> scheduled;
+
+ private ParallelEvaluationState(
+ ExecutorService executor, Map, Boolean> scheduled) {
+ this.executor = executor;
+ this.scheduled = scheduled;
+ }
+
+ @Override
+ public void schedule(TransformExecutor> work) {
+ executor.submit(work);
+ scheduled.put(work, true);
+ }
+
+ @Override
+ public void complete(TransformExecutor> completed) {
+ scheduled.remove(completed);
+ }
+ }
+
+ /**
+ * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor}
+ * scheduled will be placed on the work queue. Only one item of work will be submitted to the
+ * {@link ExecutorService} at any time.
+ *
+ * A principal use of this is for the serial evaluation of a (Step, Key) pair.
+ * Keyed computations are processed serially per step.
+ */
+ private static class SerialEvaluationState implements TransformExecutorService {
+ private final ExecutorService executor;
+ private final Map, Boolean> scheduled;
+
+ private AtomicReference> currentlyEvaluating;
+ private final Queue> workQueue;
+
+ private SerialEvaluationState(
+ ExecutorService executor, Map, Boolean> scheduled) {
+ this.scheduled = scheduled;
+ this.executor = executor;
+ this.currentlyEvaluating = new AtomicReference<>();
+ this.workQueue = new ConcurrentLinkedQueue<>();
+ }
+
+ /**
+ * Schedules the work, adding it to the work queue if there is a bundle currently being
+ * evaluated and scheduling it immediately otherwise.
+ */
+ @Override
+ public void schedule(TransformExecutor> work) {
+ workQueue.offer(work);
+ updateCurrentlyEvaluating();
+ }
+
+ @Override
+ public void complete(TransformExecutor> completed) {
+ if (!currentlyEvaluating.compareAndSet(completed, null)) {
+ throw new IllegalStateException(
+ "Finished work "
+ + completed
+ + " but could not complete due to unexpected currently executing "
+ + currentlyEvaluating.get());
+ }
+ scheduled.remove(completed);
+ updateCurrentlyEvaluating();
+ }
+
+ private void updateCurrentlyEvaluating() {
+ if (currentlyEvaluating.get() == null) {
+ // Only synchronize if we need to update what's currently evaluating
+ synchronized (this) {
+ TransformExecutor> newWork = workQueue.poll();
+ if (newWork != null) {
+ if (currentlyEvaluating.compareAndSet(null, newWork)) {
+ scheduled.put(newWork, true);
+ executor.submit(newWork);
+ } else {
+ workQueue.offer(newWork);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(SerialEvaluationState.class)
+ .add("currentlyEvaluating", currentlyEvaluating)
+ .add("workQueue", workQueue)
+ .toString();
+ }
+ }
+}
+
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
index 4beac337d6..97f0e25d38 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
@@ -21,7 +21,6 @@
import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
index f47cd1de98..314d81f6aa 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
@@ -17,7 +17,6 @@
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
new file mode 100644
index 0000000000..27d59b9a64
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Ordering;
+
+import org.joda.time.Instant;
+
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Executes callbacks that occur based on the progression of the watermark per-step.
+ *
+ * Callbacks are registered by calls to
+ * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)},
+ * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the
+ * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the
+ * windowing strategy would have been produced.
+ *
+ *
NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any
+ * {@link AppliedPTransform} - any call to
+ * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}
+ * that could have potentially already fired should be followed by a call to
+ * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current
+ * value of the watermark.
+ */
+class WatermarkCallbackExecutor {
+ /**
+ * Create a new {@link WatermarkCallbackExecutor}.
+ */
+ public static WatermarkCallbackExecutor create() {
+ return new WatermarkCallbackExecutor();
+ }
+
+ private final ConcurrentMap, PriorityQueue>
+ callbacks;
+ private final ExecutorService executor;
+
+ private WatermarkCallbackExecutor() {
+ this.callbacks = new ConcurrentHashMap<>();
+ this.executor = Executors.newSingleThreadExecutor();
+ }
+
+ /**
+ * Execute the provided {@link Runnable} after the next call to
+ * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have
+ * produced output.
+ */
+ public void callOnGuaranteedFiring(
+ AppliedPTransform, ?, ?> step,
+ BoundedWindow window,
+ WindowingStrategy, ?> windowingStrategy,
+ Runnable runnable) {
+ WatermarkCallback callback =
+ WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
+
+ PriorityQueue callbackQueue = callbacks.get(step);
+ if (callbackQueue == null) {
+ callbackQueue = new PriorityQueue<>(11, new CallbackOrdering());
+ if (callbacks.putIfAbsent(step, callbackQueue) != null) {
+ callbackQueue = callbacks.get(step);
+ }
+ }
+
+ synchronized (callbackQueue) {
+ callbackQueue.offer(callback);
+ }
+ }
+
+ /**
+ * Schedule all pending callbacks that must have produced output by the time of the provided
+ * watermark.
+ */
+ public void fireForWatermark(AppliedPTransform, ?, ?> step, Instant watermark) {
+ PriorityQueue callbackQueue = callbacks.get(step);
+ if (callbackQueue == null) {
+ return;
+ }
+ synchronized (callbackQueue) {
+ while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) {
+ executor.submit(callbackQueue.poll().getCallback());
+ }
+ }
+ }
+
+ private static class WatermarkCallback {
+ public static WatermarkCallback onGuaranteedFiring(
+ BoundedWindow window, WindowingStrategy, W> strategy, Runnable callback) {
+ @SuppressWarnings("unchecked")
+ Instant firingAfter =
+ strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
+ return new WatermarkCallback(firingAfter, callback);
+ }
+
+ private final Instant fireAfter;
+ private final Runnable callback;
+
+ private WatermarkCallback(Instant fireAfter, Runnable callback) {
+ this.fireAfter = fireAfter;
+ this.callback = callback;
+ }
+
+ public boolean shouldFire(Instant currentWatermark) {
+ return currentWatermark.isAfter(fireAfter)
+ || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ }
+
+ public Runnable getCallback() {
+ return callback;
+ }
+ }
+
+ private static class CallbackOrdering extends Ordering {
+ @Override
+ public int compare(WatermarkCallback left, WatermarkCallback right) {
+ return ComparisonChain.start()
+ .compare(left.fireAfter, right.fireAfter)
+ .compare(left.callback, right.callback, Ordering.arbitrary())
+ .result();
+ }
+ }
+}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
index cc0347a124..b8d20e303f 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java
@@ -1690,21 +1690,9 @@ public List> getSideInputs() {
@Override
public PCollection> apply(PCollection> input) {
- if (fn instanceof RequiresContextInternal) {
- return input
- .apply(GroupByKey.create(fewKeys))
- .apply(ParDo.of(new DoFn>, KV>>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element());
- }
- }))
- .apply(Combine.groupedValues(fn).withSideInputs(sideInputs));
- } else {
- return input
- .apply(GroupByKey.create(fewKeys))
- .apply(Combine.groupedValues(fn).withSideInputs(sideInputs));
- }
+ return input
+ .apply(GroupByKey.create(fewKeys))
+ .apply(Combine.groupedValues(fn).withSideInputs(sideInputs));
}
}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
new file mode 100644
index 0000000000..656c010d91
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java
@@ -0,0 +1,1100 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
+import com.google.cloud.dataflow.sdk.coders.StandardCoder;
+import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
+import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
+import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.GlobalCombineFn;
+import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
+import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext;
+import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context;
+import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import com.google.cloud.dataflow.sdk.util.PropertyNames;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Static utility methods that create combine function instances.
+ */
+public class CombineFns {
+
+ /**
+ * Returns a {@link ComposeKeyedCombineFnBuilder} to construct a composed
+ * {@link PerKeyCombineFn}.
+ *
+ * The same {@link TupleTag} cannot be used in a composition multiple times.
+ *
+ *
Example:
+ *
{ @code
+ * PCollection> latencies = ...;
+ *
+ * TupleTag maxLatencyTag = new TupleTag();
+ * TupleTag meanLatencyTag = new TupleTag();
+ *
+ * SimpleFunction identityFn =
+ * new SimpleFunction() {
+ * @Override
+ * public Integer apply(Integer input) {
+ * return input;
+ * }};
+ * PCollection> maxAndMean = latencies.apply(
+ * Combine.perKey(
+ * CombineFns.composeKeyed()
+ * .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
+ * .with(identityFn, new MeanFn(), meanLatencyTag)));
+ *
+ * PCollection finalResultCollection = maxAndMean
+ * .apply(ParDo.of(
+ * new DoFn, T>() {
+ * @Override
+ * public void processElement(ProcessContext c) throws Exception {
+ * KV e = c.element();
+ * Integer maxLatency = e.getValue().get(maxLatencyTag);
+ * Double meanLatency = e.getValue().get(meanLatencyTag);
+ * .... Do Something ....
+ * c.output(...some T...);
+ * }
+ * }));
+ * }
+ */
+ public static ComposeKeyedCombineFnBuilder composeKeyed() {
+ return new ComposeKeyedCombineFnBuilder();
+ }
+
+ /**
+ * Returns a {@link ComposeCombineFnBuilder} to construct a composed
+ * {@link GlobalCombineFn}.
+ *
+ * The same {@link TupleTag} cannot be used in a composition multiple times.
+ *
+ *
Example:
+ *
{ @code
+ * PCollection globalLatencies = ...;
+ *
+ * TupleTag maxLatencyTag = new TupleTag();
+ * TupleTag meanLatencyTag = new TupleTag();
+ *
+ * SimpleFunction identityFn =
+ * new SimpleFunction() {
+ * @Override
+ * public Integer apply(Integer input) {
+ * return input;
+ * }};
+ * PCollection maxAndMean = globalLatencies.apply(
+ * Combine.globally(
+ * CombineFns.compose()
+ * .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
+ * .with(identityFn, new MeanFn(), meanLatencyTag)));
+ *
+ * PCollection finalResultCollection = maxAndMean
+ * .apply(ParDo.of(
+ * new DoFn() {
+ * @Override
+ * public void processElement(ProcessContext c) throws Exception {
+ * CoCombineResult e = c.element();
+ * Integer maxLatency = e.get(maxLatencyTag);
+ * Double meanLatency = e.get(meanLatencyTag);
+ * .... Do Something ....
+ * c.output(...some T...);
+ * }
+ * }));
+ * }
+ */
+ public static ComposeCombineFnBuilder compose() {
+ return new ComposeCombineFnBuilder();
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * A builder class to construct a composed {@link PerKeyCombineFn}.
+ */
+ public static class ComposeKeyedCombineFnBuilder {
+ /**
+ * Returns a {@link ComposedKeyedCombineFn} that can take additional
+ * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
+ *
+ * The {@link ComposedKeyedCombineFn} extracts inputs from {@code DataT} with
+ * the {@code extractInputFn} and combines them with the {@code keyedCombineFn},
+ * and then it outputs each combined value with a {@link TupleTag} to a
+ * {@link CoCombineResult}.
+ */
+ public ComposedKeyedCombineFn with(
+ SimpleFunction extractInputFn,
+ KeyedCombineFn keyedCombineFn,
+ TupleTag outputTag) {
+ return new ComposedKeyedCombineFn()
+ .with(extractInputFn, keyedCombineFn, outputTag);
+ }
+
+ /**
+ * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
+ * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
+ *
+ * The {@link ComposedKeyedCombineFnWithContext} extracts inputs from {@code DataT} with
+ * the {@code extractInputFn} and combines them with the {@code keyedCombineFnWithContext},
+ * and then it outputs each combined value with a {@link TupleTag} to a
+ * {@link CoCombineResult}.
+ */
+ public ComposedKeyedCombineFnWithContext with(
+ SimpleFunction extractInputFn,
+ KeyedCombineFnWithContext keyedCombineFnWithContext,
+ TupleTag outputTag) {
+ return new ComposedKeyedCombineFnWithContext()
+ .with(extractInputFn, keyedCombineFnWithContext, outputTag);
+ }
+
+ /**
+ * Returns a {@link ComposedKeyedCombineFn} that can take additional
+ * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
+ */
+ public ComposedKeyedCombineFn with(
+ SimpleFunction extractInputFn,
+ CombineFn combineFn,
+ TupleTag outputTag) {
+ return with(extractInputFn, combineFn.asKeyedFn(), outputTag);
+ }
+
+ /**
+ * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional
+ * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function.
+ */
+ public ComposedKeyedCombineFnWithContext with(
+ SimpleFunction extractInputFn,
+ CombineFnWithContext combineFnWithContext,
+ TupleTag outputTag) {
+ return with(extractInputFn, combineFnWithContext.asKeyedFn(), outputTag);
+ }
+ }
+
+ /**
+ * A builder class to construct a composed {@link GlobalCombineFn}.
+ */
+ public static class ComposeCombineFnBuilder {
+ /**
+ * Returns a {@link ComposedCombineFn} that can take additional
+ * {@link GlobalCombineFn GlobalCombineFns} and apply them as a single combine function.
+ *
+ * The {@link ComposedCombineFn} extracts inputs from {@code DataT} with
+ * the {@code extractInputFn} and combines them with the {@code combineFn},
+ * and then it outputs each combined value with a {@link TupleTag} to a
+ * {@link CoCombineResult}.
+ */
+ public ComposedCombineFn with(
+ SimpleFunction extractInputFn,
+ CombineFn combineFn,
+ TupleTag outputTag) {
+ return new ComposedCombineFn()
+ .with(extractInputFn, combineFn, outputTag);
+ }
+
+ /**
+ * Returns a {@link ComposedCombineFnWithContext} that can take additional
+ * {@link GlobalCombineFn GlobalCombineFns} and apply them as a single combine function.
+ *
+ * The {@link ComposedCombineFnWithContext} extracts inputs from {@code DataT} with
+ * the {@code extractInputFn} and combines them with the {@code combineFnWithContext},
+ * and then it outputs each combined value with a {@link TupleTag} to a
+ * {@link CoCombineResult}.
+ */
+ public ComposedCombineFnWithContext with(
+ SimpleFunction extractInputFn,
+ CombineFnWithContext combineFnWithContext,
+ TupleTag outputTag) {
+ return new ComposedCombineFnWithContext()
+ .with(extractInputFn, combineFnWithContext, outputTag);
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * A tuple of outputs produced by a composed combine functions.
+ *
+ * See {@link #compose()} or {@link #composeKeyed()}) for details.
+ */
+ public static class CoCombineResult implements Serializable {
+
+ private enum NullValue {
+ INSTANCE;
+ }
+
+ private final Map, Object> valuesMap;
+
+ /**
+ * The constructor of {@link CoCombineResult}.
+ *
+ * Null values should have been filtered out from the {@code valuesMap}.
+ * {@link TupleTag TupleTags} that associate with null values doesn't exist in the key set of
+ * {@code valuesMap}.
+ *
+ * @throws NullPointerException if any key or value in {@code valuesMap} is null
+ */
+ CoCombineResult(Map, Object> valuesMap) {
+ ImmutableMap.Builder, Object> builder = ImmutableMap.builder();
+ for (Entry, Object> entry : valuesMap.entrySet()) {
+ if (entry.getValue() != null) {
+ builder.put(entry);
+ } else {
+ builder.put(entry.getKey(), NullValue.INSTANCE);
+ }
+ }
+ this.valuesMap = builder.build();
+ }
+
+ /**
+ * Returns the value represented by the given {@link TupleTag}.
+ *
+ * It is an error to request a non-exist tuple tag from the {@link CoCombineResult}.
+ */
+ @SuppressWarnings("unchecked")
+ public V get(TupleTag tag) {
+ checkArgument(
+ valuesMap.keySet().contains(tag), "TupleTag " + tag + " is not in the CoCombineResult");
+ Object value = valuesMap.get(tag);
+ if (value == NullValue.INSTANCE) {
+ return null;
+ } else {
+ return (V) value;
+ }
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * A composed {@link CombineFn} that applies multiple {@link CombineFn CombineFns}.
+ *
+ * For each {@link CombineFn} it extracts inputs from {@code DataT} with
+ * the {@code extractInputFn} and combines them,
+ * and then it outputs each combined value with a {@link TupleTag} to a
+ * {@link CoCombineResult}.
+ */
+ public static class ComposedCombineFn extends CombineFn {
+
+ private final List> combineFns;
+ private final List> extractInputFns;
+ private final List> outputTags;
+ private final int combineFnCount;
+
+ private ComposedCombineFn() {
+ this.extractInputFns = ImmutableList.of();
+ this.combineFns = ImmutableList.of();
+ this.outputTags = ImmutableList.of();
+ this.combineFnCount = 0;
+ }
+
+ private ComposedCombineFn(
+ ImmutableList> extractInputFns,
+ ImmutableList> combineFns,
+ ImmutableList> outputTags) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ List> castedExtractInputFns = (List) extractInputFns;
+ this.extractInputFns = castedExtractInputFns;
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ List> castedCombineFns = (List) combineFns;
+ this.combineFns = castedCombineFns;
+
+ this.outputTags = outputTags;
+ this.combineFnCount = this.combineFns.size();
+ }
+
+ /**
+ * Returns a {@link ComposedCombineFn} with an additional {@link CombineFn}.
+ */
+ public ComposedCombineFn with(
+ SimpleFunction extractInputFn,
+ CombineFn combineFn,
+ TupleTag outputTag) {
+ checkUniqueness(outputTags, outputTag);
+ return new ComposedCombineFn<>(
+ ImmutableList.>builder()
+ .addAll(extractInputFns)
+ .add(extractInputFn)
+ .build(),
+ ImmutableList.>builder()
+ .addAll(combineFns)
+ .add(combineFn)
+ .build(),
+ ImmutableList.>builder()
+ .addAll(outputTags)
+ .add(outputTag)
+ .build());
+ }
+
+ /**
+ * Returns a {@link ComposedCombineFnWithContext} with an additional
+ * {@link CombineFnWithContext}.
+ */
+ public ComposedCombineFnWithContext with(
+ SimpleFunction extractInputFn,
+ CombineFnWithContext combineFn,
+ TupleTag outputTag) {
+ checkUniqueness(outputTags, outputTag);
+ List> fnsWithContext = Lists.newArrayList();
+ for (CombineFn fn : combineFns) {
+ fnsWithContext.add(toFnWithContext(fn));
+ }
+ return new ComposedCombineFnWithContext<>(
+ ImmutableList.>builder()
+ .addAll(extractInputFns)
+ .add(extractInputFn)
+ .build(),
+ ImmutableList.>builder()
+ .addAll(fnsWithContext)
+ .add(combineFn)
+ .build(),
+ ImmutableList.>builder()
+ .addAll(outputTags)
+ .add(outputTag)
+ .build());
+ }
+
+ @Override
+ public Object[] createAccumulator() {
+ Object[] accumsArray = new Object[combineFnCount];
+ for (int i = 0; i < combineFnCount; ++i) {
+ accumsArray[i] = combineFns.get(i).createAccumulator();
+ }
+ return accumsArray;
+ }
+
+ @Override
+ public Object[] addInput(Object[] accumulator, DataT value) {
+ for (int i = 0; i < combineFnCount; ++i) {
+ Object input = extractInputFns.get(i).apply(value);
+ accumulator[i] = combineFns.get(i).addInput(accumulator[i], input);
+ }
+ return accumulator;
+ }
+
+ @Override
+ public Object[] mergeAccumulators(Iterable accumulators) {
+ Iterator iter = accumulators.iterator();
+ if (!iter.hasNext()) {
+ return createAccumulator();
+ } else {
+ // Reuses the first accumulator, and overwrites its values.
+ // It is safe because {@code accum[i]} only depends on
+ // the i-th component of each accumulator.
+ Object[] accum = iter.next();
+ for (int i = 0; i < combineFnCount; ++i) {
+ accum[i] = combineFns.get(i).mergeAccumulators(new ProjectionIterable(accumulators, i));
+ }
+ return accum;
+ }
+ }
+
+ @Override
+ public CoCombineResult extractOutput(Object[] accumulator) {
+ Map, Object> valuesMap = Maps.newHashMap();
+ for (int i = 0; i < combineFnCount; ++i) {
+ valuesMap.put(
+ outputTags.get(i),
+ combineFns.get(i).extractOutput(accumulator[i]));
+ }
+ return new CoCombineResult(valuesMap);
+ }
+
+ @Override
+ public Object[] compact(Object[] accumulator) {
+ for (int i = 0; i < combineFnCount; ++i) {
+ accumulator[i] = combineFns.get(i).compact(accumulator[i]);
+ }
+ return accumulator;
+ }
+
+ @Override
+ public Coder getAccumulatorCoder(CoderRegistry registry, Coder dataCoder)
+ throws CannotProvideCoderException {
+ List> coders = Lists.newArrayList();
+ for (int i = 0; i < combineFnCount; ++i) {
+ Coder inputCoder =
+ registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder);
+ coders.add(combineFns.get(i).getAccumulatorCoder(registry, inputCoder));
+ }
+ return new ComposedAccumulatorCoder(coders);
+ }
+ }
+
+ /**
+ * A composed {@link CombineFnWithContext} that applies multiple
+ * {@link CombineFnWithContext CombineFnWithContexts}.
+ *
+ * For each {@link CombineFnWithContext} it extracts inputs from {@code DataT} with
+ * the {@code extractInputFn} and combines them,
+ * and then it outputs each combined value with a {@link TupleTag} to a
+ * {@link CoCombineResult}.
+ */
+ public static class ComposedCombineFnWithContext
+ extends CombineFnWithContext {
+
+ private final List> extractInputFns;
+ private final List> combineFnWithContexts;
+ private final List> outputTags;
+ private final int combineFnCount;
+
+ private ComposedCombineFnWithContext() {
+ this.extractInputFns = ImmutableList.of();
+ this.combineFnWithContexts = ImmutableList.of();
+ this.outputTags = ImmutableList.of();
+ this.combineFnCount = 0;
+ }
+
+ private ComposedCombineFnWithContext(
+ ImmutableList> extractInputFns,
+ ImmutableList> combineFnWithContexts,
+ ImmutableList> outputTags) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ List> castedExtractInputFns =
+ (List) extractInputFns;
+ this.extractInputFns = castedExtractInputFns;
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ List> castedCombineFnWithContexts
+ = (List) combineFnWithContexts;
+ this.combineFnWithContexts = castedCombineFnWithContexts;
+
+ this.outputTags = outputTags;
+ this.combineFnCount = this.combineFnWithContexts.size();
+ }
+
+ /**
+ * Returns a {@link ComposedCombineFnWithContext} with an additional {@link GlobalCombineFn}.
+ */
+ public ComposedCombineFnWithContext with(
+ SimpleFunction extractInputFn,
+ GlobalCombineFn