From 9478f4117de3a2d0ea40614ed4cb801918610724 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Tue, 15 Mar 2016 16:15:16 +0800 Subject: [PATCH] [BEAM-79] add Gearpump runner --- runners/gearpump/README.md | 22 + runners/gearpump/pom.xml | 296 ++++++++++ .../gearpump/GearpumpPipelineOptions.java | 67 +++ .../gearpump/GearpumpPipelineResult.java | 42 ++ .../gearpump/GearpumpPipelineRunner.java | 193 +++++++ .../GearpumpPipelineRunnerRegistrar.java | 63 +++ .../gearpump/GearpumpPipelineTranslator.java | 139 +++++ .../runners/gearpump/TestGearpumpRunner.java | 64 +++ .../gearpump/examples/StreamingWordCount.java | 105 ++++ .../examples/UnboundedTextSource.java | 138 +++++ .../translators/CreateValuesTranslator.java | 49 ++ .../FlattenPCollectionTranslator.java | 47 ++ .../translators/GroupByKeyTranslator.java | 103 ++++ .../ParDoBoundMultiTranslator.java | 154 ++++++ .../translators/ParDoBoundTranslator.java | 54 ++ .../translators/ReadBoundedTranslator.java | 44 ++ .../translators/ReadUnboundedTranslator.java | 46 ++ .../translators/TransformTranslator.java | 31 ++ .../translators/TranslationContext.java | 95 ++++ .../translators/functions/DoFnFunction.java | 88 +++ .../translators/io/BoundedSourceWrapper.java | 44 ++ .../translators/io/GearpumpSource.java | 100 ++++ .../io/UnboundedSourceWrapper.java | 45 ++ .../gearpump/translators/io/ValuesSource.java | 164 ++++++ .../translators/utils/GearpumpDoFnRunner.java | 513 ++++++++++++++++++ .../utils/NoOpSideInputReader.java | 48 ++ .../translators/utils/NoOpStepContext.java | 71 +++ runners/pom.xml | 11 + .../apache/beam/sdk/testing/TestPipeline.java | 2 + 29 files changed, 2838 insertions(+) create mode 100644 runners/gearpump/README.md create mode 100644 runners/gearpump/pom.xml create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java create mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java diff --git a/runners/gearpump/README.md b/runners/gearpump/README.md new file mode 100644 index 000000000000..ad043faea4c7 --- /dev/null +++ b/runners/gearpump/README.md @@ -0,0 +1,22 @@ + + +## Gearpump Beam Runner + +The Gearpump Beam runner allows users to execute pipelines written using the Apache Beam programming API with Apache Gearpump (incubating) as an execution engine. \ No newline at end of file diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml new file mode 100644 index 000000000000..c725daecba80 --- /dev/null +++ b/runners/gearpump/pom.xml @@ -0,0 +1,296 @@ + + + + + 4.0.0 + + + org.apache.beam + beam-runners-parent + 0.2.0-incubating-SNAPSHOT + ../pom.xml + + + beam-runners-gearpump + + Apache Beam :: Runners :: Gearpump + jar + + + UTF-8 + UTF-8 + 0.8.1-SNAPSHOT + + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + gearpump-shaded-repo + Vincent at Bintray + http://dl.bintray.com/fvunicorn/maven + + + + + + org.apache.gearpump + gearpump-streaming_2.11 + ${gearpump.version} + provided + + + org.apache.gearpump + gearpump-core_2.11 + ${gearpump.version} + provided + + + com.google.code.findbugs + jsr305 + + + + + org.apache.gearpump + gearpump-daemon_2.11 + ${gearpump.version} + provided + + + org.apache.gearpump + gearpump-experimental-cgroup_2.11 + + + + + com.typesafe + config + provided + 1.3.0 + + + org.scala-lang + scala-library + 2.11.8 + provided + + + org.apache.beam + beam-sdks-java-core + + + org.slf4j + slf4j-jdk14 + + + com.google.collections + google-collections + + + + + org.apache.beam + beam-runners-core-java + + + com.google.code.findbugs + annotations + 3.0.1 + + + org.slf4j + slf4j-api + + + joda-time + joda-time + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + + + com.google.http-client + google-http-client + + + com.google.guava + guava + + + junit + junit + test + + + org.hamcrest + hamcrest-all + test + + + org.apache.beam + beam-sdks-java-core + test-jar + test + + + org.slf4j + slf4j-jdk14 + + + + + org.mockito + mockito-all + test + + + com.google.auto.service + auto-service + 1.0-rc2 + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + true + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + + maven-failsafe-plugin + + + + integration-test + verify + + + + + 1 + -Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + + runnable-on-service-tests + + org.apache.beam.sdk.testing.RunnableOnService + none + true + + org.apache.beam:beam-sdks-java-core + org.apache.beam:beam-runners-java-core + + + + + org.apache.beam.sdk.io.BigQueryIOTest, + org.apache.beam.sdk.io.CountingInputTest, + org.apache.beam.sdk.io.CountingSourceTest, + org.apache.beam.sdk.testing.PAssertTest, + org.apache.beam.sdk.transforms.ApproximateUniqueTest, + org.apache.beam.sdk.transforms.CombineTest, + org.apache.beam.sdk.transforms.CombineFnsTest, + org.apache.beam.sdk.transforms.CountTest, + org.apache.beam.sdk.transforms.FlattenTest, + org.apache.beam.sdk.transforms.ParDoTest, + org.apache.beam.sdk.transforms.SampleTest, + org.apache.beam.sdk.transforms.ViewTest, + org.apache.beam.sdk.transforms.join.CoGroupByKeyTest + + + + org.apache.beam.sdk.transforms.windowing.WindowingTest, + org.apache.beam.sdk.util.ReshuffleTest + + + + + [ + "--runner=TestGearpumpRunner", + "--streaming=true" + ] + + + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + jar-with-dependencies + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + + \ No newline at end of file diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java new file mode 100644 index 000000000000..5b6ee960745d --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import org.apache.gearpump.cluster.client.ClientContext; +import org.apache.gearpump.cluster.embedded.EmbeddedCluster; + +import java.util.Map; + +/** + * Options that configure the Gearpump pipeline. + */ +public interface GearpumpPipelineOptions extends PipelineOptions { + + @Description("set unique application name for Gearpump runner") + void setApplicationName(String name); + + String getApplicationName(); + + @Description("set parallelism for Gearpump processor") + void setParallelism(int parallelism); + + @Default.Integer(1) + int getParallelism(); + + @Description("register Kryo serializers") + void setSerializers(Map serializers); + + @JsonIgnore + Map getSerializers(); + + @Description("set EmbeddedCluster for tests") + void setEmbeddedCluster(EmbeddedCluster cluster); + + @JsonIgnore + EmbeddedCluster getEmbeddedCluster(); + + void setClientContext(ClientContext clientContext); + + @JsonIgnore + @Description("get client context to query application status") + ClientContext getClientContext(); + +} + diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java new file mode 100644 index 000000000000..bc2714727259 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.gearpump; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.runners.AggregatorRetrievalException; +import org.apache.beam.sdk.runners.AggregatorValues; +import org.apache.beam.sdk.transforms.Aggregator; + +/** + * Result of executing a {@link Pipeline} with Gearpump. + */ +public class GearpumpPipelineResult implements PipelineResult { + @Override + public State getState() { + return null; + } + + @Override + public AggregatorValues getAggregatorValues(Aggregator aggregator) + throws AggregatorRetrievalException { + throw new AggregatorRetrievalException( + "PipelineResult getAggregatorValues not supported in Gearpump pipeline", + new UnsupportedOperationException()); + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java new file mode 100644 index 000000000000..660d7039737d --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.gearpump; + +import org.apache.beam.runners.gearpump.translators.TranslationContext; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.AssignWindows; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gearpump.cluster.ClusterConfig; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.cluster.client.ClientContext; +import org.apache.gearpump.cluster.embedded.EmbeddedCluster; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; + +import java.util.HashMap; +import java.util.Map; + +/** + * A {@link PipelineRunner} that executes the operations in the + * pipeline by first translating them to Gearpump Stream DSL + * and then executing them on a Gearpump cluster. + *

> + * This is based on DataflowPipelineRunner. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class GearpumpPipelineRunner extends PipelineRunner { + + private final GearpumpPipelineOptions options; + + private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers"; + private static final String DEFAULT_APPNAME = "beam_gearpump_app"; + + public GearpumpPipelineRunner(GearpumpPipelineOptions options) { + this.options = options; + } + + public static GearpumpPipelineRunner fromOptions(PipelineOptions options) { + GearpumpPipelineOptions pipelineOptions = + PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options); + return new GearpumpPipelineRunner(pipelineOptions); + } + + + public OutputT apply( + PTransform transform, InputT input) { + if (Window.Bound.class.equals(transform.getClass())) { + return (OutputT) super.apply( + new AssignWindowsAndSetStrategy((Window.Bound) transform), input); + } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) + && ((PCollectionList) input).size() == 0) { + return (OutputT) Pipeline.applyTransform(input, Create.of()); + } else if (Create.Values.class.equals(transform.getClass())) { + return (OutputT) PCollection + .createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.BOUNDED); + } else { + return super.apply(transform, input); + } + } + + @Override + public GearpumpPipelineResult run(Pipeline pipeline) { + String appName = options.getApplicationName(); + if (null == appName) { + appName = DEFAULT_APPNAME; + } + Config config = registerSerializers(ClusterConfig.defaultConfig(), + options.getSerializers()); + ClientContext clientContext = getClientContext(options, config); + options.setClientContext(clientContext); + JavaStreamApp streamApp = new JavaStreamApp( + appName, clientContext, UserConfig.empty()); + TranslationContext translationContext = new TranslationContext(streamApp, options); + GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext); + translator.translate(pipeline); + streamApp.run(); + + return null; + } + + private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) { + EmbeddedCluster cluster = options.getEmbeddedCluster(); + if (cluster != null) { + return cluster.newClientContext(); + } else { + return ClientContext.apply(config); + } + } + + /** + * register class with default kryo serializers. + */ + private Config registerSerializers(Config config, Map userSerializers) { + Map serializers = new HashMap<>(); + serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", ""); + serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", ""); + serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", ""); + serializers.put("org.joda.time.Instant", ""); + serializers.put("org.apache.beam.sdk.values.KV", ""); + serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", ""); + serializers.put("org.apache.beam.sdk.values.TimestampedValue", ""); + if (userSerializers != null && !userSerializers.isEmpty()) { + serializers.putAll(userSerializers); + } + return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers)); + } + + + /** + * copied from DirectPipelineRunner. + * used to replace Window.Bound till window function is added to Gearpump Stream DSL + */ + private static class AssignWindowsAndSetStrategy + extends PTransform, PCollection> { + + private final Window.Bound wrapped; + + public AssignWindowsAndSetStrategy(Window.Bound wrapped) { + this.wrapped = wrapped; + } + + @Override + public PCollection apply(PCollection input) { + WindowingStrategy outputStrategy = + wrapped.getOutputStrategyInternal(input.getWindowingStrategy()); + + WindowFn windowFn = + (WindowFn) outputStrategy.getWindowFn(); + + if (!windowFn.isNonMerging()) { + throw new UnsupportedOperationException( + "merging window is not supported in Gearpump pipeline"); + } + + // If the Window.Bound transform only changed parts other than the WindowFn, then + // we skip AssignWindows even though it should be harmless in a perfect world. + // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly + // crash if another GBK is performed without explicitly setting the WindowFn. So we skip + // AssignWindows in this case. + if (wrapped.getWindowFn() == null) { + return input.apply("Identity", ParDo.of(new IdentityFn())) + .setWindowingStrategyInternal(outputStrategy); + } else { + return input + .apply("AssignWindows", new AssignWindows<>(windowFn)) + .setWindowingStrategyInternal(outputStrategy); + } + } + } + + private static class IdentityFn extends DoFn { + @Override + public void processElement(ProcessContext c) { + c.output(c.element()); + } + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java new file mode 100644 index 000000000000..2b9e89e6a4c4 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; + +/** + * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the + * {@link GearpumpPipelineRunner}. + * + * {@link AutoService} will register Gearpump's implementations of the {@link PipelineRunner} + * and {@link PipelineOptions} as available pipeline runner services. + */ +public class GearpumpPipelineRunnerRegistrar { + private GearpumpPipelineRunnerRegistrar() { } + + /** + * Registers the {@link GearpumpPipelineRunner}. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + + @Override + public Iterable>> getPipelineRunners() { + return ImmutableList.>>of( + TestGearpumpRunner.class); + } + } + + /** + * Registers the {@link GearpumpPipelineOptions}. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + + @Override + public Iterable> getPipelineOptions() { + return ImmutableList.>of(GearpumpPipelineOptions.class); + } + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java new file mode 100644 index 000000000000..59f0df7e8d3e --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump; + +import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator; +import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator; +import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator; +import org.apache.beam.runners.gearpump.translators.ParDoBoundMultiTranslator; +import org.apache.beam.runners.gearpump.translators.ParDoBoundTranslator; +import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator; +import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator; +import org.apache.beam.runners.gearpump.translators.TransformTranslator; +import org.apache.beam.runners.gearpump.translators.TranslationContext; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PValue; + +import org.apache.gearpump.util.Graph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects + * into Gearpump {@link Graph}. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { + + private static final Logger LOG = LoggerFactory.getLogger( + GearpumpPipelineTranslator.class); + + /** + * A map from {@link PTransform} subclass to the corresponding + * {@link TransformTranslator} to use to translate that transform. + */ + private static final Map, TransformTranslator> + transformTranslators = new HashMap<>(); + + private final TranslationContext translationContext; + + static { + // register TransformTranslators + registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator()); + registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); + registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator()); + registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); + registerTransformTranslator(Flatten.FlattenPCollectionList.class, + new FlattenPCollectionTranslator()); + registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator()); + registerTransformTranslator(Create.Values.class, new CreateValuesTranslator()); + } + + public GearpumpPipelineTranslator(TranslationContext translationContext) { + this.translationContext = translationContext; + } + + public void translate(Pipeline pipeline) { + pipeline.traverseTopologically(this); + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { + LOG.debug("entering composite transform {}", node.getTransform()); + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + LOG.debug("leaving composite transform {}", node.getTransform()); + } + + @Override + public void visitPrimitiveTransform(TransformTreeNode node) { + LOG.debug("visiting transform {}", node.getTransform()); + PTransform transform = node.getTransform(); + TransformTranslator translator = getTransformTranslator(transform.getClass()); + if (null == translator) { + throw new IllegalStateException( + "no translator registered for " + transform); + } + translationContext.setCurrentTransform(node); + translator.translate(transform, translationContext); + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + LOG.debug("visiting value {}", value); + } + + /** + * Records that instances of the specified PTransform class + * should be translated by default by the corresponding + * {@link TransformTranslator}. + */ + private static void registerTransformTranslator( + Class transformClass, + TransformTranslator transformTranslator) { + if (transformTranslators.put(transformClass, transformTranslator) != null) { + throw new IllegalArgumentException( + "defining multiple translators for " + transformClass); + } + } + + /** + * Returns the {@link TransformTranslator} to use for instances of the + * specified PTransform class, or null if none registered. + */ + private + TransformTranslator getTransformTranslator(Class transformClass) { + return transformTranslators.get(transformClass); + } + + +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java new file mode 100644 index 000000000000..cedd31ff4ab2 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +import org.apache.gearpump.cluster.embedded.EmbeddedCluster; + +/** + * Gearpump {@link PipelineRunner} for tests, which uses {@link EmbeddedCluster}. + */ +public class TestGearpumpRunner extends PipelineRunner { + + private final GearpumpPipelineRunner delegate; + private final EmbeddedCluster cluster; + + private TestGearpumpRunner(GearpumpPipelineOptions options) { + cluster = EmbeddedCluster.apply(); + cluster.start(); + options.setEmbeddedCluster(cluster); + delegate = GearpumpPipelineRunner.fromOptions(options); + } + + public static TestGearpumpRunner fromOptions(PipelineOptions options) { + GearpumpPipelineOptions pipelineOptions = + PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options); + return new TestGearpumpRunner(pipelineOptions); + } + + @Override + public GearpumpPipelineResult run(Pipeline pipeline) { + GearpumpPipelineResult result = delegate.run(pipeline); + cluster.stop(); + return result; + } + + @Override + public + OutputT apply(PTransform transform, InputT input) { + return delegate.apply(transform, input); + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java new file mode 100644 index 000000000000..c51289d5fefe --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.examples; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.runners.gearpump.GearpumpPipelineRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.apache.gearpump.cluster.client.ClientContext; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * streaming word count example on Gearpump runner. + */ +public class StreamingWordCount { + + static class ExtractWordsFn extends DoFn { + private final Aggregator emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + static class FormatAsStringFn extends DoFn, String> { + private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class); + + @Override + public void processElement(ProcessContext c) { + String row = c.element().getKey() + + " - " + c.element().getValue() + + " @ " + c.timestamp().toString(); + LOG.debug("output {}", row); + c.output(row); + } + } + + + public static void main(String[] args) { + GearpumpPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() + .as(GearpumpPipelineOptions.class); + options.setApplicationName("StreamingWordCount"); + options.setRunner(GearpumpPipelineRunner.class); + options.setParallelism(1); + Pipeline p = Pipeline.create(options); + + PCollection> wordCounts = + p.apply(Read.from(new UnboundedTextSource())) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) + .apply(Count.perElement()); + + wordCounts.apply(ParDo.of(new FormatAsStringFn())); + + p.run(); + + ClientContext clientContext = options.getClientContext(); + clientContext.close(); + + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java new file mode 100644 index 000000000000..caf066c9710a --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.examples; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; + +import org.joda.time.Instant; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +/** + * unbounded source that reads from text. + */ +public class UnboundedTextSource extends UnboundedSource { + + @Override + public List> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.>singletonList(this); + } + + @Override + public UnboundedReader createReader(PipelineOptions options, + @Nullable CheckpointMark checkpointMark) { + return new UnboundedTextReader(this); + } + + @Nullable + @Override + public Coder getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + } + + @Override + public Coder getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + + /** + * reads from text. + */ + public static class UnboundedTextReader extends UnboundedReader implements Serializable { + + private static final long serialVersionUID = 7526472295622776147L; + + private final UnboundedTextSource source; + + private final String[] texts = new String[]{"foo foo foo bar bar", "foo foo bar bar bar"}; + private long index = 0; + + private String currentRecord; + + private Instant currentTimestamp; + + public UnboundedTextReader(UnboundedTextSource source) { + this.source = source; + } + + @Override + public boolean start() throws IOException { + currentRecord = texts[0]; + currentTimestamp = new Instant(0); + return true; + } + + @Override + public boolean advance() throws IOException { + index++; + currentRecord = texts[(int) index % (texts.length)]; + currentTimestamp = new Instant(index * 1000); + + return true; + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + return new byte[0]; + } + + @Override + public String getCurrent() throws NoSuchElementException { + return this.currentRecord; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return currentTimestamp; + } + + @Override + public void close() throws IOException { + } + + @Override + public Instant getWatermark() { + return currentTimestamp; + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource getCurrentSource() { + return this.source; + } + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java new file mode 100644 index 000000000000..452127aa2c9b --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper; +import org.apache.beam.runners.gearpump.translators.io.ValuesSource; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.WindowedValue; + +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; + +/** + * Wraps elements from Create.Values into an {@link UnboundedSource}. + * mainly used for test + */ +public class CreateValuesTranslator implements TransformTranslator> { + + @Override + public void translate(Create.Values transform, TranslationContext context) { + try { + UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper<>( + new ValuesSource<>(transform.getElements(), + transform.getDefaultOutputCoder(context.getInput(transform))), + context.getPipelineOptions()); + JavaStream> sourceStream = context.getSourceStream(unboundedSourceWrapper); + context.setOutputStream(context.getOutput(transform), sourceStream); + } catch (CannotProvideCoderException e) { + throw new RuntimeException(e); + } + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java new file mode 100644 index 000000000000..b06d5a840817 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; + +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; + +/** + * Flatten.FlattenPCollectionList is translated to Gearpump merge function. + * Note only two-way merge is working now + */ +public class FlattenPCollectionTranslator implements + TransformTranslator> { + + @Override + public void translate(Flatten.FlattenPCollectionList transform, TranslationContext context) { + JavaStream merged = null; + System.out.println("PCollectionList size " + context.getInput(transform).size()); + for (PCollection collection : context.getInput(transform).getAll()) { + JavaStream inputStream = context.getInputStream(collection); + if (null == merged) { + merged = inputStream; + } else { + merged = merged.merge(inputStream, transform.getName()); + } + } + context.setOutputStream(context.getOutput(transform), merged); + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java new file mode 100644 index 000000000000..f36b908f842d --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; + +import com.google.common.collect.Iterables; + +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; +import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction; +import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction; +import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * {@link GroupByKey} is translated to Gearpump groupBy function. + */ +public class GroupByKeyTranslator implements TransformTranslator> { + @Override + public void translate(GroupByKey transform, TranslationContext context) { + JavaStream>> inputStream = + context.getInputStream(context.getInput(transform)); + int parallelism = context.getPipelineOptions().getParallelism(); + JavaStream>>> outputStream = inputStream + .flatMap(new KeyedByKeyAndWindow(), "keyed_by_Key_and_Window") + .groupBy(new GroupByKeyAndWindow(), parallelism, "group_by_Key_and_Window") + .map(new ExtractKeyValue(), "extract_Key_and_Value") + .reduce(new MergeValue(), "merge_value"); + + context.setOutputStream(context.getOutput(transform), outputStream); + } + + private static class KeyedByKeyAndWindow implements + FlatMapFunction>, WindowedValue, V>>> { + + @Override + public Iterator, V>>> apply(WindowedValue> wv) { + List, V>>> ret = new ArrayList<>(wv.getWindows().size + ()); + for (BoundedWindow window : wv.getWindows()) { + KV keyWin = KV.of(wv.getValue().getKey(), window); + ret.add(WindowedValue.of(KV.of(keyWin, wv.getValue().getValue()), + wv.getTimestamp(), window, wv.getPane())); + } + return ret.iterator(); + } + } + + private static class GroupByKeyAndWindow implements + GroupByFunction, V>>, KV> { + + @Override + public KV apply(WindowedValue, V>> wv) { + return wv.getValue().getKey(); + } + } + + private static class ExtractKeyValue implements + MapFunction, V>>, + WindowedValue>>> { + @Override + public WindowedValue>> apply(WindowedValue, V>> wv) { + return WindowedValue.of(KV.of(wv.getValue().getKey().getKey(), + (Iterable) Collections.singletonList(wv.getValue().getValue())), + wv.getTimestamp(), wv.getWindows(), wv.getPane()); + } + } + + private static class MergeValue implements + ReduceFunction>>> { + @Override + public WindowedValue>> apply(WindowedValue>> wv1, + WindowedValue>> wv2) { + return WindowedValue.of(KV.of(wv1.getValue().getKey(), + Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())), + wv1.getTimestamp(), wv1.getWindows(), wv1.getPane()); + } + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java new file mode 100644 index 000000000000..af5bcbcf4c88 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; +import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; +import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.DoFnRunner; +import org.apache.beam.sdk.util.DoFnRunners; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +import com.google.common.collect.Lists; + +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction; +import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; +import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * {@link ParDo.BoundMulti} is translated to Gearpump flatMap function + * with {@link DoFn} wrapped in {@link DoFnMultiFunction}. The outputs are + * further filtered with Gearpump filter function by output tag + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ParDoBoundMultiTranslator implements + TransformTranslator> { + + @Override + public void translate(ParDo.BoundMulti transform, TranslationContext context) { + PCollection inputT = (PCollection) context.getInput(transform); + JavaStream> inputStream = context.getInputStream(inputT); + Map, PCollection> outputs = context.getOutput(transform).getAll(); + + JavaStream, OutputT>>> outputStream = inputStream.flatMap( + new DoFnMultiFunction<>( + context.getPipelineOptions(), + transform.getFn(), + transform.getMainOutputTag(), + transform.getSideOutputTags(), + inputT.getWindowingStrategy(), + new NoOpSideInputReader() + ), transform.getName()); + for (Map.Entry, PCollection> output : outputs.entrySet()) { + JavaStream> taggedStream = outputStream + .filter(new FilterByOutputTag<>((TupleTag) output.getKey()) + , "filter_by_output_tag") + .map(new ExtractOutput(), "extract output"); + + context.setOutputStream(output.getValue(), taggedStream); + } + } + + /** + * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}. + */ + private static class DoFnMultiFunction implements + FlatMapFunction, WindowedValue, OutputT>>>, + DoFnRunners.OutputManager { + + private final DoFnRunner doFnRunner; + private final List, OutputT>>> outputs = Lists + .newArrayList(); + + public DoFnMultiFunction( + GearpumpPipelineOptions pipelineOptions, + DoFn doFn, + TupleTag mainOutputTag, + TupleTagList sideOutputTags, + WindowingStrategy windowingStrategy, + SideInputReader sideInputReader) { + this.doFnRunner = new GearpumpDoFnRunner<>( + pipelineOptions, + doFn, + sideInputReader, + this, + mainOutputTag, + sideOutputTags.getAll(), + new NoOpStepContext(), + windowingStrategy + ); + } + + @Override + public Iterator, OutputT>>> apply(WindowedValue wv) { + doFnRunner.startBundle(); + doFnRunner.processElement(wv); + doFnRunner.finishBundle(); + + return outputs.iterator(); + } + + @Override + public void output(TupleTag tag, WindowedValue output) { + KV, OutputT> kv = KV.of((TupleTag) tag, + (OutputT) output.getValue()); + outputs.add(WindowedValue.of(kv, output.getTimestamp(), + output.getWindows(), output.getPane())); + } + } + + private static class FilterByOutputTag implements + FilterFunction, OutputT>>> { + + private final TupleTag tupleTag; + + public FilterByOutputTag(TupleTag tupleTag) { + this.tupleTag = tupleTag; + } + + @Override + public boolean apply(WindowedValue, OutputT>> wv) { + return wv.getValue().getKey().equals(tupleTag); + } + } + + private static class ExtractOutput implements + MapFunction, OutputT>>, WindowedValue> { + + @Override + public WindowedValue apply(WindowedValue, OutputT>> wv) { + return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(), + wv.getWindows(), wv.getPane()); + } + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java new file mode 100644 index 000000000000..689bc08e0f47 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction; +import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; + +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; + + +/** + * {@link ParDo.Bound} is translated to Gearpump flatMap function + * with {@link DoFn} wrapped in {@link DoFnFunction}. + */ +public class ParDoBoundTranslator implements + TransformTranslator> { + + @Override + public void translate(ParDo.Bound transform, TranslationContext context) { + DoFn doFn = transform.getFn(); + PCollection output = context.getOutput(transform); + WindowingStrategy windowingStrategy = output.getWindowingStrategy(); + + DoFnFunction doFnFunction = new DoFnFunction<>(context.getPipelineOptions(), + doFn, windowingStrategy, new NoOpSideInputReader()); + JavaStream> inputStream = + context.getInputStream(context.getInput(transform)); + JavaStream> outputStream = + inputStream.flatMap(doFnFunction, transform.getName()); + + context.setOutputStream(context.getOutput(transform), outputStream); + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java new file mode 100644 index 000000000000..478d58f87b1e --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.source.DataSource; + +/** + * {@link Read.Bounded} is translated to Gearpump source function + * and {@link BoundedSource} is wrapped into Gearpump {@link DataSource}. + */ +public class ReadBoundedTranslator implements TransformTranslator> { + + @Override + public void translate(Read.Bounded transform, TranslationContext context) { + BoundedSource boundedSource = transform.getSource(); + BoundedSourceWrapper sourceWrapper = new BoundedSourceWrapper<>(boundedSource, + context.getPipelineOptions()); + JavaStream> sourceStream = context.getSourceStream(sourceWrapper); + + context.setOutputStream(context.getOutput(transform), sourceStream); + } + +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java new file mode 100644 index 000000000000..7e12a9c629fd --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.util.WindowedValue; + +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.source.DataSource; + +/** + * {@link Read.Unbounded} is translated to Gearpump source function + * and {@link UnboundedSource} is wrapped into Gearpump {@link DataSource}. + */ + +public class ReadUnboundedTranslator implements TransformTranslator> { + + @Override + public void translate(Read.Unbounded transform, TranslationContext context) { + UnboundedSource unboundedSource = transform.getSource(); + UnboundedSourceWrapper unboundedSourceWrapper = new UnboundedSourceWrapper<>( + unboundedSource, context.getPipelineOptions()); + JavaStream> sourceStream = context.getSourceStream(unboundedSourceWrapper); + + context.setOutputStream(context.getOutput(transform), sourceStream); + } + +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java new file mode 100644 index 000000000000..1ed6d5daa28e --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + + +import org.apache.beam.sdk.transforms.PTransform; + +import java.io.Serializable; + +/** + * translates {@link PTransform} to Gearpump functions. + */ +public interface TransformTranslator extends Serializable { + void translate(T transform, TranslationContext context); +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java new file mode 100644 index 000000000000..b9b2c7aeeb91 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; + +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import org.apache.gearpump.streaming.source.DataSource; + +import java.util.HashMap; +import java.util.Map; + +/** + * Maintains context data for {@link TransformTranslator}s. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class TranslationContext { + + private final JavaStreamApp streamApp; + private final GearpumpPipelineOptions pipelineOptions; + private AppliedPTransform currentTransform; + private final Map> streams = new HashMap<>(); + + public TranslationContext(JavaStreamApp streamApp, GearpumpPipelineOptions pipelineOptions) { + this.streamApp = streamApp; + this.pipelineOptions = pipelineOptions; + + } + + public void setCurrentTransform(TransformTreeNode treeNode) { + this.currentTransform = AppliedPTransform.of(treeNode.getFullName(), + treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform()); + } + + public GearpumpPipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + public JavaStream getInputStream(PValue input) { + return (JavaStream) streams.get(input); + } + + public void setOutputStream(PValue output, JavaStream outputStream) { + if (!streams.containsKey(output)) { + streams.put(output, outputStream); + } + } + + public InputT getInput(PTransform transform) { + return (InputT) getCurrentTransform(transform).getInput(); + } + + public OutputT getOutput(PTransform transform) { + return (OutputT) getCurrentTransform(transform).getOutput(); + } + + private AppliedPTransform getCurrentTransform(PTransform transform) { + checkArgument( + currentTransform != null && currentTransform.getTransform() == transform, + "can only be called with current transform"); + return currentTransform; + } + + public JavaStream getSourceStream(DataSource dataSource) { + return streamApp.source(dataSource, pipelineOptions.getParallelism(), + UserConfig.empty(), "source"); + } + +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java new file mode 100644 index 000000000000..088fc14c10f8 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.functions; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; +import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.DoFnRunner; +import org.apache.beam.sdk.util.DoFnRunners; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +import com.google.api.client.util.Lists; + +import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; + +import java.util.Iterator; +import java.util.List; + +/** + * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}. + */ +public class DoFnFunction implements + FlatMapFunction, WindowedValue>, DoFnRunners.OutputManager { + + private final TupleTag mainTag = new TupleTag() { + }; + private final DoFnRunner doFnRunner; + private List> outputs = Lists.newArrayList(); + + public DoFnFunction( + GearpumpPipelineOptions pipelineOptions, + DoFn doFn, + WindowingStrategy windowingStrategy, + SideInputReader sideInputReader) { + this.doFnRunner = new GearpumpDoFnRunner<>( + pipelineOptions, + doFn, + sideInputReader, + this, + mainTag, + TupleTagList.empty().getAll(), + new NoOpStepContext(), + windowingStrategy + ); + } + + @Override + public Iterator> apply(WindowedValue value) { + outputs = Lists.newArrayList(); + + doFnRunner.startBundle(); + doFnRunner.processElement(value); + doFnRunner.finishBundle(); + + return outputs.iterator(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public void output(TupleTag tag, WindowedValue output) { + if (mainTag.equals(tag)) { + outputs.add((WindowedValue) output); + } else { + throw new RuntimeException("output is not of main tag"); + } + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java new file mode 100644 index 000000000000..f25d113e5c9d --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.io; + +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.PipelineOptions; + +import java.io.IOException; + +/** + * wrapper over BoundedSource for Gearpump DataSource API. + */ +public class BoundedSourceWrapper extends GearpumpSource { + + private final BoundedSource source; + + public BoundedSourceWrapper(BoundedSource source, PipelineOptions options) { + super(options); + this.source = source; + } + + + @Override + protected Source.Reader createReader(PipelineOptions options) throws IOException { + return source.createReader(options); + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java new file mode 100644 index 000000000000..892ccc3b39e9 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.io; + +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.gearpump.Message; +import org.apache.gearpump.streaming.source.DataSource; +import org.apache.gearpump.streaming.task.TaskContext; + +import org.joda.time.Instant; + +import java.io.IOException; + +/** + * common methods for {@link BoundedSourceWrapper} and {@link UnboundedSourceWrapper}. + */ +public abstract class GearpumpSource implements DataSource { + + protected final byte[] serializedOptions; + + protected Source.Reader reader; + protected boolean available = false; + + public GearpumpSource(PipelineOptions options) { + try { + this.serializedOptions = new ObjectMapper().writeValueAsBytes(options); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + protected abstract Source.Reader createReader(PipelineOptions options) throws IOException; + + @Override + public void open(TaskContext context, long startTime) { + try { + PipelineOptions options = new ObjectMapper() + .readValue(serializedOptions, PipelineOptions.class); + this.reader = createReader(options); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + close(); + } + } + @Override + public Message read() { + Message message = null; + try { + if (available) { + T data = reader.getCurrent(); + Instant timestamp = reader.getCurrentTimestamp(); + available = reader.advance(); + message = Message.apply( + WindowedValue.of(data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), + timestamp.getMillis()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + close(); + } + return message; + } + + @Override + public void close() { + try { + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java new file mode 100644 index 000000000000..b39f29f4219a --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.io; + +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; + +import java.io.IOException; + +/** + * wrapper over UnboundedSource for Gearpump DataSource API. + */ +public class UnboundedSourceWrapper + extends GearpumpSource { + + private final UnboundedSource source; + + public UnboundedSourceWrapper(UnboundedSource source, + PipelineOptions options) { + super(options); + this.source = source; + } + + @Override + protected Source.Reader createReader(PipelineOptions options) throws IOException { + return source.createReader(options, null); + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java new file mode 100644 index 000000000000..24055f7e4d0a --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.io; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; + +import org.joda.time.Instant; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +/** + * unbounded source that reads from a Java {@link Iterable}. + */ +public class ValuesSource extends UnboundedSource { + + private final Iterable values; + private final Coder coder; + + public ValuesSource(Iterable values, Coder coder) { + this.values = encode(values, coder); + this.coder = coder; + } + + private Iterable encode(Iterable values, Coder coder) { + List bytes = new LinkedList<>(); + for (T t: values) { + try { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + coder.encode(t, stream, Coder.Context.OUTER); + bytes.add(stream.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return bytes; + } + + @Override + public java.util.List> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.singletonList(this); + } + + @Override + public UnboundedReader createReader(PipelineOptions options, + @Nullable CheckpointMark checkpointMark) { + return new ValuesReader<>(values, coder, this); + } + + @Nullable + @Override + public Coder getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + } + + @Override + public Coder getDefaultOutputCoder() { + return coder; + } + + private static class ValuesReader extends UnboundedReader implements Serializable { + + private final Iterable values; + private final Coder coder; + private final UnboundedSource source; + private transient Iterator iterator; + private T current; + + public ValuesReader(Iterable values, Coder coder, + UnboundedSource source) { + this.values = values; + this.coder = coder; + this.source = source; + } + + private T decode(byte[] bytes) throws IOException { + ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + try { + return coder.decode(inputStream, Coder.Context.OUTER); + } finally { + inputStream.close(); + } + } + + @Override + public boolean start() throws IOException { + if (null == iterator) { + iterator = values.iterator(); + } + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (iterator.hasNext()) { + current = decode(iterator.next()); + return true; + } else { + return false; + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + return current; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException { + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource getCurrentSource() { + return source; + } + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java new file mode 100644 index 000000000000..608ad7c85135 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.utils; + +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.DoFnRunner; +import org.apache.beam.sdk.util.DoFnRunners; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.SimpleDoFnRunner; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import org.joda.time.Instant; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * a serializable {@link SimpleDoFnRunner}. + */ +public class GearpumpDoFnRunner implements DoFnRunner, + Serializable { + + private final DoFn fn; + private final transient PipelineOptions options; + private final SideInputReader sideInputReader; + private final DoFnRunners.OutputManager outputManager; + private final TupleTag mainOutputTag; + private final List> sideOutputTags; + private final ExecutionContext.StepContext stepContext; + private final WindowFn windowFn; + private DoFnContext context; + + public GearpumpDoFnRunner( + GearpumpPipelineOptions pipelineOptions, + DoFn doFn, + SideInputReader sideInputReader, + DoFnRunners.OutputManager outputManager, + TupleTag mainOutputTag, + List> sideOutputTags, + ExecutionContext.StepContext stepContext, + WindowingStrategy windowingStrategy) { + this.fn = doFn; + this.options = pipelineOptions; + this.sideInputReader = sideInputReader; + this.outputManager = outputManager; + this.mainOutputTag = mainOutputTag; + this.sideOutputTags = sideOutputTags; + this.stepContext = stepContext; + this.windowFn = windowingStrategy == null ? null : windowingStrategy.getWindowFn(); + } + + @Override + public void startBundle() { + // This can contain user code. Wrap it in case it throws an exception. + try { + if (null == context) { + this.context = new DoFnContext<>( + options, + fn, + sideInputReader, + outputManager, + mainOutputTag, + sideOutputTags, + stepContext, + windowFn + ); + } + fn.startBundle(context); + } catch (Throwable t) { + // Exception in user code. + throw wrapUserCodeException(t); + } + } + + @Override + public void processElement(WindowedValue elem) { + if (elem.getWindows().size() <= 1 + || (!DoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass()) + && context.sideInputReader.isEmpty())) { + invokeProcessElement(elem); + } else { + // We could modify the windowed value (and the processContext) to + // avoid repeated allocations, but this is more straightforward. + for (BoundedWindow window : elem.getWindows()) { + invokeProcessElement(WindowedValue.of( + elem.getValue(), elem.getTimestamp(), window, elem.getPane())); + } + } + } + + @Override + public void finishBundle() { + // This can contain user code. Wrap it in case it throws an exception. + try { + fn.finishBundle(context); + } catch (Throwable t) { + // Exception in user code. + throw wrapUserCodeException(t); + } + } + + private void invokeProcessElement(WindowedValue elem) { + final DoFn.ProcessContext processContext = + new DoFnProcessContext<>(fn, context, elem); + // This can contain user code. Wrap it in case it throws an exception. + try { + fn.processElement(processContext); + } catch (Exception ex) { + throw wrapUserCodeException(ex); + } + } + + private RuntimeException wrapUserCodeException(Throwable t) { + throw UserCodeException.wrapIf(!isSystemDoFn(), t); + } + + private boolean isSystemDoFn() { + return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class); + } + + /** + * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}. + * + * @param the type of the DoFn's (main) input elements + * @param the type of the DoFn's (main) output elements + */ + private static class DoFnContext + extends DoFn.Context { + private static final int MAX_SIDE_OUTPUTS = 1000; + + final transient PipelineOptions options; + final DoFn fn; + final SideInputReader sideInputReader; + final DoFnRunners.OutputManager outputManager; + final TupleTag mainOutputTag; + final ExecutionContext.StepContext stepContext; + final WindowFn windowFn; + + /** + * The set of known output tags, some of which may be undeclared, so we can throw an + * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}. + */ + private final Set> outputTags; + + public DoFnContext(PipelineOptions options, + DoFn fn, + SideInputReader sideInputReader, + DoFnRunners.OutputManager outputManager, + TupleTag mainOutputTag, + List> sideOutputTags, + ExecutionContext.StepContext stepContext, + WindowFn windowFn) { + fn.super(); + this.options = options; + this.fn = fn; + this.sideInputReader = sideInputReader; + this.outputManager = outputManager; + this.mainOutputTag = mainOutputTag; + this.outputTags = Sets.newHashSet(); + + outputTags.add(mainOutputTag); + for (TupleTag sideOutputTag : sideOutputTags) { + outputTags.add(sideOutputTag); + } + + this.stepContext = stepContext; + this.windowFn = windowFn; + super.setupDelegateAggregators(); + } + + ////////////////////////////////////////////////////////////////////////////// + + @Override + public PipelineOptions getPipelineOptions() { + return options; + } + + WindowedValue makeWindowedValue( + T output, Instant timestamp, Collection windows, PaneInfo pane) { + final Instant inputTimestamp = timestamp; + + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + if (windows == null) { + try { + // The windowFn can never succeed at accessing the element, so its type does not + // matter here + @SuppressWarnings("unchecked") + WindowFn objectWindowFn = (WindowFn) windowFn; + windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input element when none was available"); + } + + @Override + public Instant timestamp() { + if (inputTimestamp == null) { + throw new UnsupportedOperationException( + "WindowFn attempted to access input timestamp when none was available"); + } + return inputTimestamp; + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input windows when none were available"); + } + }); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + + return WindowedValue.of(output, timestamp, windows, pane); + } + + public T sideInput(PCollectionView view, BoundedWindow mainInputWindow) { + if (!sideInputReader.contains(view)) { + throw new IllegalArgumentException("calling sideInput() with unknown view"); + } + BoundedWindow sideInputWindow = + view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); + return sideInputReader.get(view, sideInputWindow); + } + + void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo pane) { + outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane)); + } + + void outputWindowedValue(WindowedValue windowedElem) { + outputManager.output(mainOutputTag, windowedElem); + if (stepContext != null) { + stepContext.noteOutput(windowedElem); + } + } + + protected void sideOutputWindowedValue(TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo pane) { + sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); + } + + protected void sideOutputWindowedValue(TupleTag tag, WindowedValue windowedElem) { + if (!outputTags.contains(tag)) { + // This tag wasn't declared nor was it seen before during this execution. + // Thus, this must be a new, undeclared and unconsumed output. + // To prevent likely user errors, enforce the limit on the number of side + // outputs. + if (outputTags.size() >= MAX_SIDE_OUTPUTS) { + throw new IllegalArgumentException( + "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); + } + outputTags.add(tag); + } + + outputManager.output(tag, windowedElem); + if (stepContext != null) { + stepContext.noteSideOutput(tag, windowedElem); + } + } + + // Following implementations of output, outputWithTimestamp, and sideOutput + // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by + // ProcessContext's versions in DoFn.processElement. + @Override + public void output(OutputT output) { + outputWindowedValue(output, null, null, PaneInfo.NO_FIRING); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + Preconditions.checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); + sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + Preconditions.checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); + sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); + } + + @Override + protected Aggregator createAggregatorInternal( + String name, Combine.CombineFn combiner) { + Preconditions.checkNotNull(combiner, + "Combiner passed to createAggregator cannot be null"); + throw new UnsupportedOperationException("aggregator not supported in Gearpump runner"); + } + } + + + /** + * A concrete implementation of {@code DoFn.ProcessContext} used for + * running a {@link DoFn} over a single element. + * + * @param the type of the DoFn's (main) input elements + * @param the type of the DoFn's (main) output elements + */ + private static class DoFnProcessContext + extends DoFn.ProcessContext { + + + final DoFn fn; + final DoFnContext context; + final WindowedValue windowedValue; + + public DoFnProcessContext(DoFn fn, + DoFnContext context, + WindowedValue windowedValue) { + fn.super(); + this.fn = fn; + this.context = context; + this.windowedValue = windowedValue; + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + @Override + public InputT element() { + return windowedValue.getValue(); + } + + @Override + public T sideInput(PCollectionView view) { + Preconditions.checkNotNull(view, "View passed to sideInput cannot be null"); + Iterator windowIter = windows().iterator(); + BoundedWindow window; + if (!windowIter.hasNext()) { + if (context.windowFn instanceof GlobalWindows) { + // TODO: Remove this once GroupByKeyOnly no longer outputs elements + // without windows + window = GlobalWindow.INSTANCE; + } else { + throw new IllegalStateException( + "sideInput called when main input element is not in any windows"); + } + } else { + window = windowIter.next(); + if (windowIter.hasNext()) { + throw new IllegalStateException( + "sideInput called when main input element is in multiple windows"); + } + } + return context.sideInput(view, window); + } + + @Override + public BoundedWindow window() { + if (!(fn instanceof DoFn.RequiresWindowAccess)) { + throw new UnsupportedOperationException( + "window() is only available in the context of a DoFn marked as RequiresWindow."); + } + return Iterables.getOnlyElement(windows()); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public void output(OutputT output) { + context.outputWindowedValue(windowedValue.withValue(output)); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + context.outputWindowedValue(output, timestamp, + windowedValue.getWindows(), windowedValue.getPane()); + } + + @Override + public void sideOutput(TupleTag tag, T output) { + Preconditions.checkNotNull(tag, "Tag passed to sideOutput cannot be null"); + context.sideOutputWindowedValue(tag, windowedValue.withValue(output)); + } + + @Override + public void sideOutputWithTimestamp(TupleTag tag, T output, Instant timestamp) { + Preconditions.checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); + context.sideOutputWindowedValue( + tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); + } + + @Override + public Instant timestamp() { + return windowedValue.getTimestamp(); + } + + public Collection windows() { + return windowedValue.getWindows(); + } + + @Override + public WindowingInternals windowingInternals() { + return new WindowingInternals() { + @Override + public void outputWindowedValue(OutputT output, Instant timestamp, + Collection windows, PaneInfo pane) { + context.outputWindowedValue(output, timestamp, windows, pane); + } + + @Override + public Collection windows() { + return windowedValue.getWindows(); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public TimerInternals timerInternals() { + return context.stepContext.timerInternals(); + } + + @Override + public void writePCollectionViewData( + TupleTag tag, + Iterable> data, + Coder elemCoder) throws IOException { + @SuppressWarnings("unchecked") + Coder windowCoder = (Coder) context.windowFn.windowCoder(); + + context.stepContext.writePCollectionViewData( + tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)), + window(), windowCoder); + } + + @Override + public StateInternals stateInternals() { + return context.stepContext.stateInternals(); + } + + @Override + public T sideInput(PCollectionView view, BoundedWindow mainInputWindow) { + return context.sideInput(view, mainInputWindow); + } + }; + } + + @Override + protected Aggregator + createAggregatorInternal( + String name, Combine.CombineFn combiner) { + return context.createAggregatorInternal(name, combiner); + } + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java new file mode 100644 index 000000000000..600ebfb225d0 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.utils; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.values.PCollectionView; + +import java.io.Serializable; + +import javax.annotation.Nullable; + +/** + * no-op side input reader. + */ +public class NoOpSideInputReader implements SideInputReader, Serializable { + @Nullable + @Override + public T get(PCollectionView view, BoundedWindow window) { + return null; + } + + @Override + public boolean contains(PCollectionView view) { + return false; + } + + @Override + public boolean isEmpty() { + return false; + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java new file mode 100644 index 000000000000..ce0935a38081 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.utils; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.TupleTag; + +import java.io.IOException; +import java.io.Serializable; + +/** + * serializable {@link ExecutionContext.StepContext} that basically does nothing. + */ +public class NoOpStepContext implements ExecutionContext.StepContext, Serializable { + + @Override + public String getStepName() { + throw new UnsupportedOperationException(); + } + + @Override + public String getTransformName() { + throw new UnsupportedOperationException(); + } + + @Override + public void noteOutput(WindowedValue output) { + } + + @Override + public void noteSideOutput(TupleTag tag, WindowedValue output) { + } + + @Override + public void writePCollectionViewData(TupleTag tag, + Iterable> data, + Coder>> dataCoder, W window, Coder windowCoder) throws + IOException { + } + + @Override + public StateInternals stateInternals() { + throw new UnsupportedOperationException(); + } + + @Override + public TimerInternals timerInternals() { + throw new UnsupportedOperationException(); + } +} diff --git a/runners/pom.xml b/runners/pom.xml index 612d4936fe34..3f985c5b4ac3 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -39,6 +39,17 @@ spark + + + java8 + + [1.8,) + + + gearpump + + + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 0de3024c2cdc..eda78112c6b3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -100,6 +100,7 @@ public static TestPipeline create() { } public static TestPipeline fromOptions(PipelineOptions options) { + System.out.println(options); return new TestPipeline(PipelineRunner.fromOptions(options), options); } @@ -138,6 +139,7 @@ public static PipelineOptions testingPipelineOptions() { @Nullable String beamTestPipelineOptions = System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS); + System.out.println("options " + beamTestPipelineOptions); PipelineOptions options = Strings.isNullOrEmpty(beamTestPipelineOptions) ? PipelineOptionsFactory.create()