diff --git a/runners/gearpump/README.md b/runners/gearpump/README.md
new file mode 100644
index 000000000000..ad043faea4c7
--- /dev/null
+++ b/runners/gearpump/README.md
@@ -0,0 +1,22 @@
+
+
+## Gearpump Beam Runner
+
+The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine.
\ No newline at end of file
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
new file mode 100644
index 000000000000..c725daecba80
--- /dev/null
+++ b/runners/gearpump/pom.xml
@@ -0,0 +1,296 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.beam
+ beam-runners-parent
+ 0.2.0-incubating-SNAPSHOT
+ ../pom.xml
+
+
+ beam-runners-gearpump
+
+ Apache Beam :: Runners :: Gearpump
+ jar
+
+
+ UTF-8
+ UTF-8
+ 0.8.1-SNAPSHOT
+
+
+
+
+ apache.snapshots
+ Apache Development Snapshot Repository
+ https://repository.apache.org/content/repositories/snapshots/
+
+ false
+
+
+ true
+
+
+
+ gearpump-shaded-repo
+ Vincent at Bintray
+ http://dl.bintray.com/fvunicorn/maven
+
+
+
+
+
+ org.apache.gearpump
+ gearpump-streaming_2.11
+ ${gearpump.version}
+ provided
+
+
+ org.apache.gearpump
+ gearpump-core_2.11
+ ${gearpump.version}
+ provided
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+
+
+ org.apache.gearpump
+ gearpump-daemon_2.11
+ ${gearpump.version}
+ provided
+
+
+ org.apache.gearpump
+ gearpump-experimental-cgroup_2.11
+
+
+
+
+ com.typesafe
+ config
+ provided
+ 1.3.0
+
+
+ org.scala-lang
+ scala-library
+ 2.11.8
+ provided
+
+
+ org.apache.beam
+ beam-sdks-java-core
+
+
+ org.slf4j
+ slf4j-jdk14
+
+
+ com.google.collections
+ google-collections
+
+
+
+
+ org.apache.beam
+ beam-runners-core-java
+
+
+ com.google.code.findbugs
+ annotations
+ 3.0.1
+
+
+ org.slf4j
+ slf4j-api
+
+
+ joda-time
+ joda-time
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.google.http-client
+ google-http-client
+
+
+ com.google.guava
+ guava
+
+
+ junit
+ junit
+ test
+
+
+ org.hamcrest
+ hamcrest-all
+ test
+
+
+ org.apache.beam
+ beam-sdks-java-core
+ test-jar
+ test
+
+
+ org.slf4j
+ slf4j-jdk14
+
+
+
+
+ org.mockito
+ mockito-all
+ test
+
+
+ com.google.auto.service
+ auto-service
+ 1.0-rc2
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ true
+ true
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+
+
+ maven-failsafe-plugin
+
+
+
+ integration-test
+ verify
+
+
+
+
+ 1
+ -Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.19.1
+
+
+ runnable-on-service-tests
+
+ org.apache.beam.sdk.testing.RunnableOnService
+ none
+ true
+
+ org.apache.beam:beam-sdks-java-core
+ org.apache.beam:beam-runners-java-core
+
+
+
+
+ org.apache.beam.sdk.io.BigQueryIOTest,
+ org.apache.beam.sdk.io.CountingInputTest,
+ org.apache.beam.sdk.io.CountingSourceTest,
+ org.apache.beam.sdk.testing.PAssertTest,
+ org.apache.beam.sdk.transforms.ApproximateUniqueTest,
+ org.apache.beam.sdk.transforms.CombineTest,
+ org.apache.beam.sdk.transforms.CombineFnsTest,
+ org.apache.beam.sdk.transforms.CountTest,
+ org.apache.beam.sdk.transforms.FlattenTest,
+ org.apache.beam.sdk.transforms.ParDoTest,
+ org.apache.beam.sdk.transforms.SampleTest,
+ org.apache.beam.sdk.transforms.ViewTest,
+ org.apache.beam.sdk.transforms.join.CoGroupByKeyTest
+
+
+
+ org.apache.beam.sdk.transforms.windowing.WindowingTest,
+ org.apache.beam.sdk.util.ReshuffleTest
+
+
+
+
+ [
+ "--runner=TestGearpumpRunner",
+ "--streaming=true"
+ ]
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+
+
\ No newline at end of file
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
new file mode 100644
index 000000000000..5b6ee960745d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+
+import java.util.Map;
+
+/**
+ * Options that configure the Gearpump pipeline.
+ */
+public interface GearpumpPipelineOptions extends PipelineOptions {
+
+ @Description("set unique application name for Gearpump runner")
+ void setApplicationName(String name);
+
+ String getApplicationName();
+
+ @Description("set parallelism for Gearpump processor")
+ void setParallelism(int parallelism);
+
+ @Default.Integer(1)
+ int getParallelism();
+
+ @Description("register Kryo serializers")
+ void setSerializers(Map serializers);
+
+ @JsonIgnore
+ Map getSerializers();
+
+ @Description("set EmbeddedCluster for tests")
+ void setEmbeddedCluster(EmbeddedCluster cluster);
+
+ @JsonIgnore
+ EmbeddedCluster getEmbeddedCluster();
+
+ void setClientContext(ClientContext clientContext);
+
+ @JsonIgnore
+ @Description("get client context to query application status")
+ ClientContext getClientContext();
+
+}
+
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
new file mode 100644
index 000000000000..bc2714727259
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.transforms.Aggregator;
+
+/**
+ * Result of executing a {@link Pipeline} with Gearpump.
+ */
+public class GearpumpPipelineResult implements PipelineResult {
+ @Override
+ public State getState() {
+ return null;
+ }
+
+ @Override
+ public AggregatorValues getAggregatorValues(Aggregator, T> aggregator)
+ throws AggregatorRetrievalException {
+ throw new AggregatorRetrievalException(
+ "PipelineResult getAggregatorValues not supported in Gearpump pipeline",
+ new UnsupportedOperationException());
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
new file mode 100644
index 000000000000..660d7039737d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AssignWindows;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to Gearpump Stream DSL
+ * and then executing them on a Gearpump cluster.
+ *
>
+ * This is based on DataflowPipelineRunner.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GearpumpPipelineRunner extends PipelineRunner {
+
+ private final GearpumpPipelineOptions options;
+
+ private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
+ private static final String DEFAULT_APPNAME = "beam_gearpump_app";
+
+ public GearpumpPipelineRunner(GearpumpPipelineOptions options) {
+ this.options = options;
+ }
+
+ public static GearpumpPipelineRunner fromOptions(PipelineOptions options) {
+ GearpumpPipelineOptions pipelineOptions =
+ PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options);
+ return new GearpumpPipelineRunner(pipelineOptions);
+ }
+
+
+ public OutputT apply(
+ PTransform transform, InputT input) {
+ if (Window.Bound.class.equals(transform.getClass())) {
+ return (OutputT) super.apply(
+ new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
+ } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
+ && ((PCollectionList>) input).size() == 0) {
+ return (OutputT) Pipeline.applyTransform(input, Create.of());
+ } else if (Create.Values.class.equals(transform.getClass())) {
+ return (OutputT) PCollection
+ .createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED);
+ } else {
+ return super.apply(transform, input);
+ }
+ }
+
+ @Override
+ public GearpumpPipelineResult run(Pipeline pipeline) {
+ String appName = options.getApplicationName();
+ if (null == appName) {
+ appName = DEFAULT_APPNAME;
+ }
+ Config config = registerSerializers(ClusterConfig.defaultConfig(),
+ options.getSerializers());
+ ClientContext clientContext = getClientContext(options, config);
+ options.setClientContext(clientContext);
+ JavaStreamApp streamApp = new JavaStreamApp(
+ appName, clientContext, UserConfig.empty());
+ TranslationContext translationContext = new TranslationContext(streamApp, options);
+ GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
+ translator.translate(pipeline);
+ streamApp.run();
+
+ return null;
+ }
+
+ private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
+ EmbeddedCluster cluster = options.getEmbeddedCluster();
+ if (cluster != null) {
+ return cluster.newClientContext();
+ } else {
+ return ClientContext.apply(config);
+ }
+ }
+
+ /**
+ * register class with default kryo serializers.
+ */
+ private Config registerSerializers(Config config, Map userSerializers) {
+ Map serializers = new HashMap<>();
+ serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", "");
+ serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", "");
+ serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", "");
+ serializers.put("org.joda.time.Instant", "");
+ serializers.put("org.apache.beam.sdk.values.KV", "");
+ serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", "");
+ serializers.put("org.apache.beam.sdk.values.TimestampedValue", "");
+ if (userSerializers != null && !userSerializers.isEmpty()) {
+ serializers.putAll(userSerializers);
+ }
+ return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
+ }
+
+
+ /**
+ * copied from DirectPipelineRunner.
+ * used to replace Window.Bound till window function is added to Gearpump Stream DSL
+ */
+ private static class AssignWindowsAndSetStrategy
+ extends PTransform, PCollection> {
+
+ private final Window.Bound wrapped;
+
+ public AssignWindowsAndSetStrategy(Window.Bound wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public PCollection apply(PCollection input) {
+ WindowingStrategy, ?> outputStrategy =
+ wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
+
+ WindowFn windowFn =
+ (WindowFn) outputStrategy.getWindowFn();
+
+ if (!windowFn.isNonMerging()) {
+ throw new UnsupportedOperationException(
+ "merging window is not supported in Gearpump pipeline");
+ }
+
+ // If the Window.Bound transform only changed parts other than the WindowFn, then
+ // we skip AssignWindows even though it should be harmless in a perfect world.
+ // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
+ // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
+ // AssignWindows in this case.
+ if (wrapped.getWindowFn() == null) {
+ return input.apply("Identity", ParDo.of(new IdentityFn()))
+ .setWindowingStrategyInternal(outputStrategy);
+ } else {
+ return input
+ .apply("AssignWindows", new AssignWindows<>(windowFn))
+ .setWindowingStrategyInternal(outputStrategy);
+ }
+ }
+ }
+
+ private static class IdentityFn extends DoFn {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element());
+ }
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
new file mode 100644
index 000000000000..2b9e89e6a4c4
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
+ * {@link GearpumpPipelineRunner}.
+ *
+ * {@link AutoService} will register Gearpump's implementations of the {@link PipelineRunner}
+ * and {@link PipelineOptions} as available pipeline runner services.
+ */
+public class GearpumpPipelineRunnerRegistrar {
+ private GearpumpPipelineRunnerRegistrar() { }
+
+ /**
+ * Registers the {@link GearpumpPipelineRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+
+ @Override
+ public Iterable>> getPipelineRunners() {
+ return ImmutableList.>>of(
+ TestGearpumpRunner.class);
+ }
+ }
+
+ /**
+ * Registers the {@link GearpumpPipelineOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+
+ @Override
+ public Iterable> getPipelineOptions() {
+ return ImmutableList.>of(GearpumpPipelineOptions.class);
+ }
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
new file mode 100644
index 000000000000..59f0df7e8d3e
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator;
+import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator;
+import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
+import org.apache.beam.runners.gearpump.translators.ParDoBoundMultiTranslator;
+import org.apache.beam.runners.gearpump.translators.ParDoBoundTranslator;
+import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
+import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
+import org.apache.beam.runners.gearpump.translators.TransformTranslator;
+import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PValue;
+
+import org.apache.gearpump.util.Graph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects
+ * into Gearpump {@link Graph}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GearpumpPipelineTranslator.class);
+
+ /**
+ * A map from {@link PTransform} subclass to the corresponding
+ * {@link TransformTranslator} to use to translate that transform.
+ */
+ private static final Map, TransformTranslator>
+ transformTranslators = new HashMap<>();
+
+ private final TranslationContext translationContext;
+
+ static {
+ // register TransformTranslators
+ registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+ registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
+ registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
+ registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
+ registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+ new FlattenPCollectionTranslator());
+ registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
+ registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+ }
+
+ public GearpumpPipelineTranslator(TranslationContext translationContext) {
+ this.translationContext = translationContext;
+ }
+
+ public void translate(Pipeline pipeline) {
+ pipeline.traverseTopologically(this);
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+ LOG.debug("entering composite transform {}", node.getTransform());
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformTreeNode node) {
+ LOG.debug("leaving composite transform {}", node.getTransform());
+ }
+
+ @Override
+ public void visitPrimitiveTransform(TransformTreeNode node) {
+ LOG.debug("visiting transform {}", node.getTransform());
+ PTransform transform = node.getTransform();
+ TransformTranslator translator = getTransformTranslator(transform.getClass());
+ if (null == translator) {
+ throw new IllegalStateException(
+ "no translator registered for " + transform);
+ }
+ translationContext.setCurrentTransform(node);
+ translator.translate(transform, translationContext);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ LOG.debug("visiting value {}", value);
+ }
+
+ /**
+ * Records that instances of the specified PTransform class
+ * should be translated by default by the corresponding
+ * {@link TransformTranslator}.
+ */
+ private static void registerTransformTranslator(
+ Class transformClass,
+ TransformTranslator extends TransformT> transformTranslator) {
+ if (transformTranslators.put(transformClass, transformTranslator) != null) {
+ throw new IllegalArgumentException(
+ "defining multiple translators for " + transformClass);
+ }
+ }
+
+ /**
+ * Returns the {@link TransformTranslator} to use for instances of the
+ * specified PTransform class, or null if none registered.
+ */
+ private
+ TransformTranslator getTransformTranslator(Class transformClass) {
+ return transformTranslators.get(transformClass);
+ }
+
+
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
new file mode 100644
index 000000000000..cedd31ff4ab2
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+
+/**
+ * Gearpump {@link PipelineRunner} for tests, which uses {@link EmbeddedCluster}.
+ */
+public class TestGearpumpRunner extends PipelineRunner {
+
+ private final GearpumpPipelineRunner delegate;
+ private final EmbeddedCluster cluster;
+
+ private TestGearpumpRunner(GearpumpPipelineOptions options) {
+ cluster = EmbeddedCluster.apply();
+ cluster.start();
+ options.setEmbeddedCluster(cluster);
+ delegate = GearpumpPipelineRunner.fromOptions(options);
+ }
+
+ public static TestGearpumpRunner fromOptions(PipelineOptions options) {
+ GearpumpPipelineOptions pipelineOptions =
+ PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options);
+ return new TestGearpumpRunner(pipelineOptions);
+ }
+
+ @Override
+ public GearpumpPipelineResult run(Pipeline pipeline) {
+ GearpumpPipelineResult result = delegate.run(pipeline);
+ cluster.stop();
+ return result;
+ }
+
+ @Override
+ public
+ OutputT apply(PTransform transform, InputT input) {
+ return delegate.apply(transform, input);
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
new file mode 100644
index 000000000000..c51289d5fefe
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.examples;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.GearpumpPipelineRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * streaming word count example on Gearpump runner.
+ */
+public class StreamingWordCount {
+
+ static class ExtractWordsFn extends DoFn {
+ private final Aggregator emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ static class FormatAsStringFn extends DoFn, String> {
+ private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class);
+
+ @Override
+ public void processElement(ProcessContext c) {
+ String row = c.element().getKey()
+ + " - " + c.element().getValue()
+ + " @ " + c.timestamp().toString();
+ LOG.debug("output {}", row);
+ c.output(row);
+ }
+ }
+
+
+ public static void main(String[] args) {
+ GearpumpPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+ .as(GearpumpPipelineOptions.class);
+ options.setApplicationName("StreamingWordCount");
+ options.setRunner(GearpumpPipelineRunner.class);
+ options.setParallelism(1);
+ Pipeline p = Pipeline.create(options);
+
+ PCollection> wordCounts =
+ p.apply(Read.from(new UnboundedTextSource()))
+ .apply(ParDo.of(new ExtractWordsFn()))
+ .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
+ .apply(Count.perElement());
+
+ wordCounts.apply(ParDo.of(new FormatAsStringFn()));
+
+ p.run();
+
+ ClientContext clientContext = options.getClientContext();
+ clientContext.close();
+
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
new file mode 100644
index 000000000000..caf066c9710a
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.examples;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * unbounded source that reads from text.
+ */
+public class UnboundedTextSource extends UnboundedSource {
+
+ @Override
+ public List extends UnboundedSource> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ return Collections.>singletonList(this);
+ }
+
+ @Override
+ public UnboundedReader createReader(PipelineOptions options,
+ @Nullable CheckpointMark checkpointMark) {
+ return new UnboundedTextReader(this);
+ }
+
+ @Nullable
+ @Override
+ public Coder getCheckpointMarkCoder() {
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return StringUtf8Coder.of();
+ }
+
+ /**
+ * reads from text.
+ */
+ public static class UnboundedTextReader extends UnboundedReader implements Serializable {
+
+ private static final long serialVersionUID = 7526472295622776147L;
+
+ private final UnboundedTextSource source;
+
+ private final String[] texts = new String[]{"foo foo foo bar bar", "foo foo bar bar bar"};
+ private long index = 0;
+
+ private String currentRecord;
+
+ private Instant currentTimestamp;
+
+ public UnboundedTextReader(UnboundedTextSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ currentRecord = texts[0];
+ currentTimestamp = new Instant(0);
+ return true;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ index++;
+ currentRecord = texts[(int) index % (texts.length)];
+ currentTimestamp = new Instant(index * 1000);
+
+ return true;
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ return new byte[0];
+ }
+
+ @Override
+ public String getCurrent() throws NoSuchElementException {
+ return this.currentRecord;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return currentTimestamp;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return currentTimestamp;
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource getCurrentSource() {
+ return this.source;
+ }
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
new file mode 100644
index 000000000000..452127aa2c9b
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+/**
+ * Wraps elements from Create.Values into an {@link UnboundedSource}.
+ * mainly used for test
+ */
+public class CreateValuesTranslator implements TransformTranslator> {
+
+ @Override
+ public void translate(Create.Values transform, TranslationContext context) {
+ try {
+ UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper<>(
+ new ValuesSource<>(transform.getElements(),
+ transform.getDefaultOutputCoder(context.getInput(transform))),
+ context.getPipelineOptions());
+ JavaStream> sourceStream = context.getSourceStream(unboundedSourceWrapper);
+ context.setOutputStream(context.getOutput(transform), sourceStream);
+ } catch (CannotProvideCoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
new file mode 100644
index 000000000000..b06d5a840817
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+/**
+ * Flatten.FlattenPCollectionList is translated to Gearpump merge function.
+ * Note only two-way merge is working now
+ */
+public class FlattenPCollectionTranslator implements
+ TransformTranslator> {
+
+ @Override
+ public void translate(Flatten.FlattenPCollectionList transform, TranslationContext context) {
+ JavaStream merged = null;
+ System.out.println("PCollectionList size " + context.getInput(transform).size());
+ for (PCollection collection : context.getInput(transform).getAll()) {
+ JavaStream inputStream = context.getInputStream(collection);
+ if (null == merged) {
+ merged = inputStream;
+ } else {
+ merged = merged.merge(inputStream, transform.getName());
+ }
+ }
+ context.setOutputStream(context.getOutput(transform), merged);
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
new file mode 100644
index 000000000000..f36b908f842d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * {@link GroupByKey} is translated to Gearpump groupBy function.
+ */
+public class GroupByKeyTranslator implements TransformTranslator> {
+ @Override
+ public void translate(GroupByKey transform, TranslationContext context) {
+ JavaStream>> inputStream =
+ context.getInputStream(context.getInput(transform));
+ int parallelism = context.getPipelineOptions().getParallelism();
+ JavaStream>>> outputStream = inputStream
+ .flatMap(new KeyedByKeyAndWindow(), "keyed_by_Key_and_Window")
+ .groupBy(new GroupByKeyAndWindow(), parallelism, "group_by_Key_and_Window")
+ .map(new ExtractKeyValue(), "extract_Key_and_Value")
+ .reduce(new MergeValue(), "merge_value");
+
+ context.setOutputStream(context.getOutput(transform), outputStream);
+ }
+
+ private static class KeyedByKeyAndWindow implements
+ FlatMapFunction>, WindowedValue, V>>> {
+
+ @Override
+ public Iterator, V>>> apply(WindowedValue> wv) {
+ List, V>>> ret = new ArrayList<>(wv.getWindows().size
+ ());
+ for (BoundedWindow window : wv.getWindows()) {
+ KV keyWin = KV.of(wv.getValue().getKey(), window);
+ ret.add(WindowedValue.of(KV.of(keyWin, wv.getValue().getValue()),
+ wv.getTimestamp(), window, wv.getPane()));
+ }
+ return ret.iterator();
+ }
+ }
+
+ private static class GroupByKeyAndWindow implements
+ GroupByFunction, V>>, KV> {
+
+ @Override
+ public KV apply(WindowedValue, V>> wv) {
+ return wv.getValue().getKey();
+ }
+ }
+
+ private static class ExtractKeyValue implements
+ MapFunction, V>>,
+ WindowedValue>>> {
+ @Override
+ public WindowedValue>> apply(WindowedValue, V>> wv) {
+ return WindowedValue.of(KV.of(wv.getValue().getKey().getKey(),
+ (Iterable) Collections.singletonList(wv.getValue().getValue())),
+ wv.getTimestamp(), wv.getWindows(), wv.getPane());
+ }
+ }
+
+ private static class MergeValue implements
+ ReduceFunction>>> {
+ @Override
+ public WindowedValue>> apply(WindowedValue>> wv1,
+ WindowedValue>> wv2) {
+ return WindowedValue.of(KV.of(wv1.getValue().getKey(),
+ Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())),
+ wv1.getTimestamp(), wv1.getWindows(), wv1.getPane());
+ }
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
new file mode 100644
index 000000000000..af5bcbcf4c88
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
+ * with {@link DoFn} wrapped in {@link DoFnMultiFunction}. The outputs are
+ * further filtered with Gearpump filter function by output tag
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ParDoBoundMultiTranslator implements
+ TransformTranslator> {
+
+ @Override
+ public void translate(ParDo.BoundMulti transform, TranslationContext context) {
+ PCollection inputT = (PCollection) context.getInput(transform);
+ JavaStream> inputStream = context.getInputStream(inputT);
+ Map, PCollection>> outputs = context.getOutput(transform).getAll();
+
+ JavaStream, OutputT>>> outputStream = inputStream.flatMap(
+ new DoFnMultiFunction<>(
+ context.getPipelineOptions(),
+ transform.getFn(),
+ transform.getMainOutputTag(),
+ transform.getSideOutputTags(),
+ inputT.getWindowingStrategy(),
+ new NoOpSideInputReader()
+ ), transform.getName());
+ for (Map.Entry, PCollection>> output : outputs.entrySet()) {
+ JavaStream> taggedStream = outputStream
+ .filter(new FilterByOutputTag<>((TupleTag) output.getKey())
+ , "filter_by_output_tag")
+ .map(new ExtractOutput(), "extract output");
+
+ context.setOutputStream(output.getValue(), taggedStream);
+ }
+ }
+
+ /**
+ * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}.
+ */
+ private static class DoFnMultiFunction implements
+ FlatMapFunction, WindowedValue, OutputT>>>,
+ DoFnRunners.OutputManager {
+
+ private final DoFnRunner doFnRunner;
+ private final List, OutputT>>> outputs = Lists
+ .newArrayList();
+
+ public DoFnMultiFunction(
+ GearpumpPipelineOptions pipelineOptions,
+ DoFn doFn,
+ TupleTag mainOutputTag,
+ TupleTagList sideOutputTags,
+ WindowingStrategy, ?> windowingStrategy,
+ SideInputReader sideInputReader) {
+ this.doFnRunner = new GearpumpDoFnRunner<>(
+ pipelineOptions,
+ doFn,
+ sideInputReader,
+ this,
+ mainOutputTag,
+ sideOutputTags.getAll(),
+ new NoOpStepContext(),
+ windowingStrategy
+ );
+ }
+
+ @Override
+ public Iterator, OutputT>>> apply(WindowedValue wv) {
+ doFnRunner.startBundle();
+ doFnRunner.processElement(wv);
+ doFnRunner.finishBundle();
+
+ return outputs.iterator();
+ }
+
+ @Override
+ public void output(TupleTag tag, WindowedValue output) {
+ KV, OutputT> kv = KV.of((TupleTag) tag,
+ (OutputT) output.getValue());
+ outputs.add(WindowedValue.of(kv, output.getTimestamp(),
+ output.getWindows(), output.getPane()));
+ }
+ }
+
+ private static class FilterByOutputTag implements
+ FilterFunction, OutputT>>> {
+
+ private final TupleTag tupleTag;
+
+ public FilterByOutputTag(TupleTag tupleTag) {
+ this.tupleTag = tupleTag;
+ }
+
+ @Override
+ public boolean apply(WindowedValue, OutputT>> wv) {
+ return wv.getValue().getKey().equals(tupleTag);
+ }
+ }
+
+ private static class ExtractOutput implements
+ MapFunction, OutputT>>, WindowedValue> {
+
+ @Override
+ public WindowedValue apply(WindowedValue, OutputT>> wv) {
+ return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(),
+ wv.getWindows(), wv.getPane());
+ }
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
new file mode 100644
index 000000000000..689bc08e0f47
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+
+/**
+ * {@link ParDo.Bound} is translated to Gearpump flatMap function
+ * with {@link DoFn} wrapped in {@link DoFnFunction}.
+ */
+public class ParDoBoundTranslator implements
+ TransformTranslator> {
+
+ @Override
+ public void translate(ParDo.Bound transform, TranslationContext context) {
+ DoFn doFn = transform.getFn();
+ PCollection output = context.getOutput(transform);
+ WindowingStrategy, ?> windowingStrategy = output.getWindowingStrategy();
+
+ DoFnFunction doFnFunction = new DoFnFunction<>(context.getPipelineOptions(),
+ doFn, windowingStrategy, new NoOpSideInputReader());
+ JavaStream> inputStream =
+ context.getInputStream(context.getInput(transform));
+ JavaStream> outputStream =
+ inputStream.flatMap(doFnFunction, transform.getName());
+
+ context.setOutputStream(context.getOutput(transform), outputStream);
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
new file mode 100644
index 000000000000..478d58f87b1e
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+
+/**
+ * {@link Read.Bounded} is translated to Gearpump source function
+ * and {@link BoundedSource} is wrapped into Gearpump {@link DataSource}.
+ */
+public class ReadBoundedTranslator implements TransformTranslator> {
+
+ @Override
+ public void translate(Read.Bounded transform, TranslationContext context) {
+ BoundedSource boundedSource = transform.getSource();
+ BoundedSourceWrapper sourceWrapper = new BoundedSourceWrapper<>(boundedSource,
+ context.getPipelineOptions());
+ JavaStream> sourceStream = context.getSourceStream(sourceWrapper);
+
+ context.setOutputStream(context.getOutput(transform), sourceStream);
+ }
+
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
new file mode 100644
index 000000000000..7e12a9c629fd
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+
+/**
+ * {@link Read.Unbounded} is translated to Gearpump source function
+ * and {@link UnboundedSource} is wrapped into Gearpump {@link DataSource}.
+ */
+
+public class ReadUnboundedTranslator implements TransformTranslator> {
+
+ @Override
+ public void translate(Read.Unbounded transform, TranslationContext context) {
+ UnboundedSource unboundedSource = transform.getSource();
+ UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper<>(
+ unboundedSource, context.getPipelineOptions());
+ JavaStream> sourceStream = context.getSourceStream(unboundedSourceWrapper);
+
+ context.setOutputStream(context.getOutput(transform), sourceStream);
+ }
+
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
new file mode 100644
index 000000000000..1ed6d5daa28e
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+import java.io.Serializable;
+
+/**
+ * translates {@link PTransform} to Gearpump functions.
+ */
+public interface TransformTranslator extends Serializable {
+ void translate(T transform, TranslationContext context);
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
new file mode 100644
index 000000000000..b9b2c7aeeb91
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.source.DataSource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Maintains context data for {@link TransformTranslator}s.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class TranslationContext {
+
+ private final JavaStreamApp streamApp;
+ private final GearpumpPipelineOptions pipelineOptions;
+ private AppliedPTransform, ?, ?> currentTransform;
+ private final Map> streams = new HashMap<>();
+
+ public TranslationContext(JavaStreamApp streamApp, GearpumpPipelineOptions pipelineOptions) {
+ this.streamApp = streamApp;
+ this.pipelineOptions = pipelineOptions;
+
+ }
+
+ public void setCurrentTransform(TransformTreeNode treeNode) {
+ this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
+ treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
+ }
+
+ public GearpumpPipelineOptions getPipelineOptions() {
+ return pipelineOptions;
+ }
+
+ public JavaStream getInputStream(PValue input) {
+ return (JavaStream) streams.get(input);
+ }
+
+ public void setOutputStream(PValue output, JavaStream outputStream) {
+ if (!streams.containsKey(output)) {
+ streams.put(output, outputStream);
+ }
+ }
+
+ public InputT getInput(PTransform transform) {
+ return (InputT) getCurrentTransform(transform).getInput();
+ }
+
+ public OutputT getOutput(PTransform, OutputT> transform) {
+ return (OutputT) getCurrentTransform(transform).getOutput();
+ }
+
+ private AppliedPTransform, ?, ?> getCurrentTransform(PTransform, ?> transform) {
+ checkArgument(
+ currentTransform != null && currentTransform.getTransform() == transform,
+ "can only be called with current transform");
+ return currentTransform;
+ }
+
+ public JavaStream getSourceStream(DataSource dataSource) {
+ return streamApp.source(dataSource, pipelineOptions.getParallelism(),
+ UserConfig.empty(), "source");
+ }
+
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
new file mode 100644
index 000000000000..088fc14c10f8
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.functions;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.api.client.util.Lists;
+
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
+ */
+public class DoFnFunction implements
+ FlatMapFunction, WindowedValue>, DoFnRunners.OutputManager {
+
+ private final TupleTag mainTag = new TupleTag() {
+ };
+ private final DoFnRunner doFnRunner;
+ private List> outputs = Lists.newArrayList();
+
+ public DoFnFunction(
+ GearpumpPipelineOptions pipelineOptions,
+ DoFn doFn,
+ WindowingStrategy, ?> windowingStrategy,
+ SideInputReader sideInputReader) {
+ this.doFnRunner = new GearpumpDoFnRunner<>(
+ pipelineOptions,
+ doFn,
+ sideInputReader,
+ this,
+ mainTag,
+ TupleTagList.empty().getAll(),
+ new NoOpStepContext(),
+ windowingStrategy
+ );
+ }
+
+ @Override
+ public Iterator> apply(WindowedValue value) {
+ outputs = Lists.newArrayList();
+
+ doFnRunner.startBundle();
+ doFnRunner.processElement(value);
+ doFnRunner.finishBundle();
+
+ return outputs.iterator();
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public void output(TupleTag tag, WindowedValue output) {
+ if (mainTag.equals(tag)) {
+ outputs.add((WindowedValue) output);
+ } else {
+ throw new RuntimeException("output is not of main tag");
+ }
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
new file mode 100644
index 000000000000..f25d113e5c9d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.io;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.IOException;
+
+/**
+ * wrapper over BoundedSource for Gearpump DataSource API.
+ */
+public class BoundedSourceWrapper extends GearpumpSource {
+
+ private final BoundedSource source;
+
+ public BoundedSourceWrapper(BoundedSource source, PipelineOptions options) {
+ super(options);
+ this.source = source;
+ }
+
+
+ @Override
+ protected Source.Reader createReader(PipelineOptions options) throws IOException {
+ return source.createReader(options);
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
new file mode 100644
index 000000000000..892ccc3b39e9
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.io;
+
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.gearpump.Message;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.task.TaskContext;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+
+/**
+ * common methods for {@link BoundedSourceWrapper} and {@link UnboundedSourceWrapper}.
+ */
+public abstract class GearpumpSource implements DataSource {
+
+ protected final byte[] serializedOptions;
+
+ protected Source.Reader reader;
+ protected boolean available = false;
+
+ public GearpumpSource(PipelineOptions options) {
+ try {
+ this.serializedOptions = new ObjectMapper().writeValueAsBytes(options);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected abstract Source.Reader createReader(PipelineOptions options) throws IOException;
+
+ @Override
+ public void open(TaskContext context, long startTime) {
+ try {
+ PipelineOptions options = new ObjectMapper()
+ .readValue(serializedOptions, PipelineOptions.class);
+ this.reader = createReader(options);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ close();
+ }
+ }
+ @Override
+ public Message read() {
+ Message message = null;
+ try {
+ if (available) {
+ T data = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+ available = reader.advance();
+ message = Message.apply(
+ WindowedValue.of(data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+ timestamp.getMillis());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ close();
+ }
+ return message;
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
new file mode 100644
index 000000000000..b39f29f4219a
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.io;
+
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.IOException;
+
+/**
+ * wrapper over UnboundedSource for Gearpump DataSource API.
+ */
+public class UnboundedSourceWrapper
+ extends GearpumpSource {
+
+ private final UnboundedSource source;
+
+ public UnboundedSourceWrapper(UnboundedSource source,
+ PipelineOptions options) {
+ super(options);
+ this.source = source;
+ }
+
+ @Override
+ protected Source.Reader createReader(PipelineOptions options) throws IOException {
+ return source.createReader(options, null);
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
new file mode 100644
index 000000000000..24055f7e4d0a
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.io;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * unbounded source that reads from a Java {@link Iterable}.
+ */
+public class ValuesSource extends UnboundedSource {
+
+ private final Iterable values;
+ private final Coder coder;
+
+ public ValuesSource(Iterable values, Coder coder) {
+ this.values = encode(values, coder);
+ this.coder = coder;
+ }
+
+ private Iterable encode(Iterable values, Coder coder) {
+ List bytes = new LinkedList<>();
+ for (T t: values) {
+ try {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ coder.encode(t, stream, Coder.Context.OUTER);
+ bytes.add(stream.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return bytes;
+ }
+
+ @Override
+ public java.util.List extends UnboundedSource> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public UnboundedReader createReader(PipelineOptions options,
+ @Nullable CheckpointMark checkpointMark) {
+ return new ValuesReader<>(values, coder, this);
+ }
+
+ @Nullable
+ @Override
+ public Coder getCheckpointMarkCoder() {
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return coder;
+ }
+
+ private static class ValuesReader extends UnboundedReader implements Serializable {
+
+ private final Iterable values;
+ private final Coder coder;
+ private final UnboundedSource source;
+ private transient Iterator iterator;
+ private T current;
+
+ public ValuesReader(Iterable values, Coder coder,
+ UnboundedSource source) {
+ this.values = values;
+ this.coder = coder;
+ this.source = source;
+ }
+
+ private T decode(byte[] bytes) throws IOException {
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+ try {
+ return coder.decode(inputStream, Coder.Context.OUTER);
+ } finally {
+ inputStream.close();
+ }
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ if (null == iterator) {
+ iterator = values.iterator();
+ }
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (iterator.hasNext()) {
+ current = decode(iterator.next());
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ return current;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource getCurrentSource() {
+ return source;
+ }
+ }
+}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
new file mode 100644
index 000000000000..608ad7c85135
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.utils;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SimpleDoFnRunner;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * a serializable {@link SimpleDoFnRunner}.
+ */
+public class GearpumpDoFnRunner implements DoFnRunner,
+ Serializable {
+
+ private final DoFn fn;
+ private final transient PipelineOptions options;
+ private final SideInputReader sideInputReader;
+ private final DoFnRunners.OutputManager outputManager;
+ private final TupleTag mainOutputTag;
+ private final List> sideOutputTags;
+ private final ExecutionContext.StepContext stepContext;
+ private final WindowFn, ?> windowFn;
+ private DoFnContext context;
+
+ public GearpumpDoFnRunner(
+ GearpumpPipelineOptions pipelineOptions,
+ DoFn doFn,
+ SideInputReader sideInputReader,
+ DoFnRunners.OutputManager outputManager,
+ TupleTag mainOutputTag,
+ List> sideOutputTags,
+ ExecutionContext.StepContext stepContext,
+ WindowingStrategy, ?> windowingStrategy) {
+ this.fn = doFn;
+ this.options = pipelineOptions;
+ this.sideInputReader = sideInputReader;
+ this.outputManager = outputManager;
+ this.mainOutputTag = mainOutputTag;
+ this.sideOutputTags = sideOutputTags;
+ this.stepContext = stepContext;
+ this.windowFn = windowingStrategy == null ? null : windowingStrategy.getWindowFn();
+ }
+
+ @Override
+ public void startBundle() {
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ if (null == context) {
+ this.context = new DoFnContext<>(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ windowFn
+ );
+ }
+ fn.startBundle(context);
+ } catch (Throwable t) {
+ // Exception in user code.
+ throw wrapUserCodeException(t);
+ }
+ }
+
+ @Override
+ public void processElement(WindowedValue elem) {
+ if (elem.getWindows().size() <= 1
+ || (!DoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
+ && context.sideInputReader.isEmpty())) {
+ invokeProcessElement(elem);
+ } else {
+ // We could modify the windowed value (and the processContext) to
+ // avoid repeated allocations, but this is more straightforward.
+ for (BoundedWindow window : elem.getWindows()) {
+ invokeProcessElement(WindowedValue.of(
+ elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+ }
+ }
+ }
+
+ @Override
+ public void finishBundle() {
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ fn.finishBundle(context);
+ } catch (Throwable t) {
+ // Exception in user code.
+ throw wrapUserCodeException(t);
+ }
+ }
+
+ private void invokeProcessElement(WindowedValue elem) {
+ final DoFn.ProcessContext processContext =
+ new DoFnProcessContext<>(fn, context, elem);
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ fn.processElement(processContext);
+ } catch (Exception ex) {
+ throw wrapUserCodeException(ex);
+ }
+ }
+
+ private RuntimeException wrapUserCodeException(Throwable t) {
+ throw UserCodeException.wrapIf(!isSystemDoFn(), t);
+ }
+
+ private boolean isSystemDoFn() {
+ return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
+ }
+
+ /**
+ * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
+ *
+ * @param the type of the DoFn's (main) input elements
+ * @param the type of the DoFn's (main) output elements
+ */
+ private static class DoFnContext
+ extends DoFn.Context {
+ private static final int MAX_SIDE_OUTPUTS = 1000;
+
+ final transient PipelineOptions options;
+ final DoFn fn;
+ final SideInputReader sideInputReader;
+ final DoFnRunners.OutputManager outputManager;
+ final TupleTag mainOutputTag;
+ final ExecutionContext.StepContext stepContext;
+ final WindowFn, ?> windowFn;
+
+ /**
+ * The set of known output tags, some of which may be undeclared, so we can throw an
+ * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
+ */
+ private final Set> outputTags;
+
+ public DoFnContext(PipelineOptions options,
+ DoFn fn,
+ SideInputReader sideInputReader,
+ DoFnRunners.OutputManager outputManager,
+ TupleTag mainOutputTag,
+ List> sideOutputTags,
+ ExecutionContext.StepContext stepContext,
+ WindowFn, ?> windowFn) {
+ fn.super();
+ this.options = options;
+ this.fn = fn;
+ this.sideInputReader = sideInputReader;
+ this.outputManager = outputManager;
+ this.mainOutputTag = mainOutputTag;
+ this.outputTags = Sets.newHashSet();
+
+ outputTags.add(mainOutputTag);
+ for (TupleTag> sideOutputTag : sideOutputTags) {
+ outputTags.add(sideOutputTag);
+ }
+
+ this.stepContext = stepContext;
+ this.windowFn = windowFn;
+ super.setupDelegateAggregators();
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ WindowedValue makeWindowedValue(
+ T output, Instant timestamp, Collection windows, PaneInfo pane) {
+ final Instant inputTimestamp = timestamp;
+
+ if (timestamp == null) {
+ timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ if (windows == null) {
+ try {
+ // The windowFn can never succeed at accessing the element, so its type does not
+ // matter here
+ @SuppressWarnings("unchecked")
+ WindowFn