diff --git a/.travis.yml b/.travis.yml
index 52e1d3a5cbd2..973618b10696 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,5 +31,5 @@ install:
script:
- travis_retry mvn versions:set -DnewVersion=manual_build
- - travis_retry mvn $MAVEN_OVERRIDE install -U
+ - travis_retry mvn $MAVEN_OVERRIDE verify -U
- travis_retry travis/test_wordcount.sh
diff --git a/pom.xml b/pom.xml
index ba130d25a3d2..de47ff5c4fa5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
pomsdk
+ runnersexamplesmaven-archetypes/startermaven-archetypes/examples
diff --git a/runners/flink/README.md b/runners/flink/README.md
new file mode 100644
index 000000000000..0fee6f09cbf5
--- /dev/null
+++ b/runners/flink/README.md
@@ -0,0 +1,202 @@
+Flink Beam Runner (Flink-Runner)
+-------------------------------
+
+Flink-Runner is a Runner for Apache Beam which enables you to
+run Beam dataflows with Flink. It integrates seamlessly with the Beam
+API, allowing you to execute Apache Beam programs in streaming or batch mode.
+
+## Streaming
+
+### Full Beam Windowing and Triggering Semantics
+
+The Flink Beam Runner supports *Event Time* allowing you to analyze data with respect to its
+associated timestamp. It handles out-or-order and late-arriving elements. You may leverage the full
+power of the Beam windowing semantics like *time-based*, *sliding*, *tumbling*, or *count*
+windows. You may build *session* windows which allow you to keep track of events associated with
+each other.
+
+### Fault-Tolerance
+
+The program's state is persisted by Apache Flink. You may re-run and resume your program upon
+failure or if you decide to continue computation at a later time.
+
+### Sources and Sinks
+
+Build your own data ingestion or digestion using the source/sink interface. Re-use Flink's sources
+and sinks or use the provided support for Apache Kafka.
+
+### Seamless integration
+
+To execute a Beam program in streaming mode, just enable streaming in the `PipelineOptions`:
+
+ options.setStreaming(true);
+
+That's it. If you prefer batched execution, simply disable streaming mode.
+
+## Batch
+
+### Batch optimization
+
+Flink gives you out-of-core algorithms which operate on its managed memory to perform sorting,
+caching, and hash table operations. We have optimized operations like CoGroup to use Flink's
+optimized out-of-core implementation.
+
+### Fault-Tolerance
+
+We guarantee job-level fault-tolerance which gracefully restarts failed batch jobs.
+
+### Sources and Sinks
+
+Build your own data ingestion or digestion using the source/sink interface or re-use Flink's sources
+and sinks.
+
+## Features
+
+The Flink Beam Runner maintains as much compatibility with the Beam API as possible. We
+support transformations on data like:
+
+- Grouping
+- Windowing
+- ParDo
+- CoGroup
+- Flatten
+- Combine
+- Side inputs/outputs
+- Encoding
+
+# Getting Started
+
+To get started using the Flink Runner, we first need to install the latest version.
+
+## Install Flink-Runner ##
+
+To retrieve the latest version of Flink-Runner, run the following command
+
+ git clone https://github.com/apache/incubator-beam
+
+Then switch to the newly created directory and run Maven to build the Beam runner:
+
+ cd incubator-beam
+ mvn clean install -DskipTests
+
+Flink-Runner is now installed in your local maven repository.
+
+## Executing an example
+
+Next, let's run the classic WordCount example. It's semantically identically to
+the example provided with ApacheBeam. Only this time, we chose the
+`FlinkPipelineRunner` to execute the WordCount on top of Flink.
+
+Here's an excerpt from the WordCount class file:
+
+```java
+Options options = PipelineOptionsFactory.fromArgs(args).as(Options.class);
+// yes, we want to run WordCount with Flink
+options.setRunner(FlinkPipelineRunner.class);
+
+Pipeline p = Pipeline.create(options);
+
+p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+ .apply(new CountWords())
+ .apply(TextIO.Write.named("WriteCounts")
+ .to(options.getOutput())
+ .withNumShards(options.getNumShards()));
+
+p.run();
+```
+
+To execute the example, let's first get some sample data:
+
+ curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > kinglear.txt
+
+Then let's run the included WordCount locally on your machine:
+
+ mvn exec:exec -Dinput=kinglear.txt -Doutput=wordcounts.txt
+
+Congratulations, you have run your first ApacheBeam program on top of Apache Flink!
+
+
+# Running Beam programs on a Flink cluster
+
+You can run your Beam program on an Apache Flink cluster. Please start off by creating a new
+Maven project.
+
+ mvn archetype:generate -DgroupId=com.mycompany.beam -DartifactId=beam-test \
+ -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
+
+The contents of the root `pom.xml` should be slightly changed aftewards (explanation below):
+
+```xml
+
+
+ 4.0.0
+
+ com.mycompany.beam
+ beam-test
+ 1.0
+
+
+
+ org.apache.beam
+ flink-runner
+ 0.2
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.4.1
+
+
+ package
+
+ shade
+
+
+
+
+ WordCount
+
+
+
+
+ org.apache.flink:*
+
+
+
+
+
+
+
+
+
+
+
+
+```
+
+The following changes have been made:
+
+1. The Flink Beam Runner was added as a dependency.
+
+2. The Maven Shade plugin was added to build a fat jar.
+
+A fat jar is necessary if you want to submit your Beam code to a Flink cluster. The fat jar
+includes your program code but also Beam code which is necessary during runtime. Note that this
+step is necessary because the Beam Runner is not part of Flink.
+
+You can then build the jar using `mvn clean package`. Please submit the fat jar in the `target`
+folder to the Flink cluster using the command-line utility like so:
+
+ ./bin/flink run /path/to/fat.jar
+
+
+# More
+
+For more information, please visit the [Apache Flink Website](http://flink.apache.org) or contact
+the [Mailinglists](http://flink.apache.org/community.html#mailing-lists).
\ No newline at end of file
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
new file mode 100644
index 000000000000..2110c2c8023f
--- /dev/null
+++ b/runners/flink/pom.xml
@@ -0,0 +1,264 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.beam
+ runners
+ 1.5.0-SNAPSHOT
+
+
+ flink-runner
+ 0.3-SNAPSHOT
+
+ Flink Beam Runner
+ jar
+
+ 2015
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+ UTF-8
+ UTF-8
+ 1.0-SNAPSHOT
+ 1.5.0-SNAPSHOT
+
+ org.apache.beam.runners.flink.examples.WordCount
+ kinglear.txt
+
+ 1
+
+
+
+
+ apache.snapshots
+ Apache Development Snapshot Repository
+ https://repository.apache.org/content/repositories/snapshots/
+
+ false
+
+
+ true
+
+
+
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-java_2.10
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-java_2.10
+ ${flink.version}
+ test
+ test-jar
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-avro_2.10
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients_2.10
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-test-utils_2.10
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-connector-kafka-0.8_2.10
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-avro
+ ${flink.version}
+
+
+ com.google.cloud.dataflow
+ google-cloud-dataflow-java-sdk-all
+ ${beam.version}
+
+
+ org.slf4j
+ slf4j-jdk14
+
+
+
+
+ org.mockito
+ mockito-all
+ 1.9.5
+ test
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.6
+
+
+
+ true
+ true
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+ 1.7
+ 1.7
+
+
+
+
+
+ maven-failsafe-plugin
+ 2.17
+
+
+
+ integration-test
+ verify
+
+
+
+
+ -Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.17
+
+ -Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-eclipse-plugin
+ 2.8
+
+
+ org.eclipse.jdt.launching.JRE_CONTAINER
+
+ true
+ true
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 1.3.1
+
+
+ enforce-maven
+
+ enforce
+
+
+
+
+ [1.7,)
+
+
+
+ [3.0.3,)
+
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 1.2.1
+
+
+ none
+
+
+
+ java
+
+ -classpath
+
+ ${clazz}
+ --input=${input}
+ --output=${output}
+ --parallelism=${parallelism}
+
+
+
+
+
+
+
+
+
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
new file mode 100644
index 000000000000..8825ed36dee0
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator;
+import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator;
+import org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * The class that instantiates and manages the execution of a given job.
+ * Depending on if the job is a Streaming or Batch processing one, it creates
+ * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}),
+ * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or
+ * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a Flink one, and
+ * executes the (translated) job.
+ */
+public class FlinkPipelineExecutionEnvironment {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+
+ private final FlinkPipelineOptions options;
+
+ /**
+ * The Flink Batch execution environment. This is instantiated to either a
+ * {@link org.apache.flink.api.java.CollectionEnvironment},
+ * a {@link org.apache.flink.api.java.LocalEnvironment} or
+ * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
+ * options.
+ */
+ private ExecutionEnvironment flinkBatchEnv;
+
+
+ /**
+ * The Flink Streaming execution environment. This is instantiated to either a
+ * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+ * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+ * on the configuration options, and more specifically, the url of the master.
+ */
+ private StreamExecutionEnvironment flinkStreamEnv;
+
+ /**
+ * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to
+ * their Flink counterparts. Based on the options provided by the user, if we have a streaming job,
+ * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job,
+ * a {@link FlinkBatchPipelineTranslator} is created.
+ */
+ private FlinkPipelineTranslator flinkPipelineTranslator;
+
+ /**
+ * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the
+ * provided {@link FlinkPipelineOptions}.
+ *
+ * @param options the user-defined pipeline options.
+ * */
+ public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+ this.options = Preconditions.checkNotNull(options);
+ this.createPipelineExecutionEnvironment();
+ this.createPipelineTranslator();
+ }
+
+ /**
+ * Depending on the type of job (Streaming or Batch) and the user-specified options,
+ * this method creates the adequate ExecutionEnvironment.
+ */
+ private void createPipelineExecutionEnvironment() {
+ if (options.isStreaming()) {
+ createStreamExecutionEnvironment();
+ } else {
+ createBatchExecutionEnvironment();
+ }
+ }
+
+ /**
+ * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
+ * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet},
+ * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}.
+ */
+ private void createPipelineTranslator() {
+ checkInitializationState();
+ if (this.flinkPipelineTranslator != null) {
+ throw new IllegalStateException("FlinkPipelineTranslator already initialized.");
+ }
+
+ this.flinkPipelineTranslator = options.isStreaming() ?
+ new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+ new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+ }
+
+ /**
+ * Depending on if the job is a Streaming or a Batch one, this method creates
+ * the necessary execution environment and pipeline translator, and translates
+ * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into
+ * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream}
+ * one.
+ * */
+ public void translate(Pipeline pipeline) {
+ checkInitializationState();
+ if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+ createPipelineExecutionEnvironment();
+ }
+ if (this.flinkPipelineTranslator == null) {
+ createPipelineTranslator();
+ }
+ this.flinkPipelineTranslator.translate(pipeline);
+ }
+
+ /**
+ * Launches the program execution.
+ * */
+ public JobExecutionResult executePipeline() throws Exception {
+ if (options.isStreaming()) {
+ if (this.flinkStreamEnv == null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+ }
+ if (this.flinkPipelineTranslator == null) {
+ throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+ }
+ return this.flinkStreamEnv.execute();
+ } else {
+ if (this.flinkBatchEnv == null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+ }
+ if (this.flinkPipelineTranslator == null) {
+ throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+ }
+ return this.flinkBatchEnv.execute();
+ }
+ }
+
+ /**
+ * If the submitted job is a batch processing job, this method creates the adequate
+ * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+ * on the user-specified options.
+ */
+ private void createBatchExecutionEnvironment() {
+ if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+ }
+
+ LOG.info("Creating the required Batch Execution Environment.");
+
+ String masterUrl = options.getFlinkMaster();
+ this.flinkStreamEnv = null;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[collection]")) {
+ this.flinkBatchEnv = new CollectionEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List stagingFiles = options.getFilesToStage();
+ this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]),
+ stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+ this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
+ this.flinkBatchEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkBatchEnv.getParallelism());
+ }
+
+ /**
+ * If the submitted job is a stream processing job, this method creates the adequate
+ * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+ * on the user-specified options.
+ */
+ private void createStreamExecutionEnvironment() {
+ if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+ }
+
+ LOG.info("Creating the required Streaming Environment.");
+
+ String masterUrl = options.getFlinkMaster();
+ this.flinkBatchEnv = null;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List stagingFiles = options.getFilesToStage();
+ this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+ this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1) {
+ this.flinkStreamEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkStreamEnv.getParallelism());
+
+ // default to event time
+ this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ // for the following 2 parameters, a value of -1 means that Flink will use
+ // the default values as specified in the configuration.
+ int numRetries = options.getNumberOfExecutionRetries();
+ if (numRetries != -1) {
+ this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
+ }
+ long retryDelay = options.getExecutionRetryDelay();
+ if (retryDelay != -1) {
+ this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
+ }
+
+ // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
+ // If the value is not -1, then the validity checks are applied.
+ // By default, checkpointing is disabled.
+ long checkpointInterval = options.getCheckpointingInterval();
+ if(checkpointInterval != -1) {
+ if (checkpointInterval < 1) {
+ throw new IllegalArgumentException("The checkpoint interval must be positive");
+ }
+ this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
+ }
+ }
+
+ private void checkInitializationState() {
+ if (options.isStreaming() && this.flinkBatchEnv != null) {
+ throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
+ } else if (!options.isStreaming() && this.flinkStreamEnv != null) {
+ throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
+ }
+ }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
new file mode 100644
index 000000000000..2f4b3ea47457
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.StreamingOptions;
+
+import java.util.List;
+
+/**
+ * Options which can be used to configure a Flink PipelineRunner.
+ */
+public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
+
+ /**
+ * List of local files to make available to workers.
+ *
+ * Jars are placed on the worker's classpath.
+ *
+ * The default value is the list of jars from the main program's classpath.
+ */
+ @Description("Jar-Files to send to all workers and put on the classpath. " +
+ "The default value is all files from the classpath.")
+ @JsonIgnore
+ List getFilesToStage();
+ void setFilesToStage(List value);
+
+ /**
+ * The job name is used to identify jobs running on a Flink cluster.
+ */
+ @Description("Dataflow job name, to uniquely identify active jobs. "
+ + "Defaults to using the ApplicationName-UserName-Date.")
+ @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class)
+ String getJobName();
+ void setJobName(String value);
+
+ /**
+ * The url of the Flink JobManager on which to execute pipelines. This can either be
+ * the the address of a cluster JobManager, in the form "host:port" or one of the special
+ * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
+ * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
+ * "[auto]" will let the system decide where to execute the pipeline based on the environment.
+ */
+ @Description("Address of the Flink Master where the Pipeline should be executed. Can" +
+ " either be of the form \"host:port\" or one of the special values [local], " +
+ "[collection] or [auto].")
+ String getFlinkMaster();
+ void setFlinkMaster(String value);
+
+ @Description("The degree of parallelism to be used when distributing operations onto workers.")
+ @Default.Integer(-1)
+ Integer getParallelism();
+ void setParallelism(Integer value);
+
+ @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " +
+ "fault tolerance).")
+ @Default.Long(-1L)
+ Long getCheckpointingInterval();
+ void setCheckpointingInterval(Long interval);
+
+ @Description("Sets the number of times that failed tasks are re-executed. " +
+ "A value of zero effectively disables fault tolerance. A value of -1 indicates " +
+ "that the system default value (as defined in the configuration) should be used.")
+ @Default.Integer(-1)
+ Integer getNumberOfExecutionRetries();
+ void setNumberOfExecutionRetries(Integer retries);
+
+ @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.")
+ @Default.Long(-1L)
+ Long getExecutionRetryDelay();
+ void setExecutionRetryDelay(Long delay);
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
new file mode 100644
index 000000000000..fe773d98ad39
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to a Flink Plan and then executing them either locally
+ * or on a Flink cluster, depending on the configuration.
+ *
+ * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
+ */
+public class FlinkPipelineRunner extends PipelineRunner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
+
+ /**
+ * Provided options.
+ */
+ private final FlinkPipelineOptions options;
+
+ private final FlinkPipelineExecutionEnvironment flinkJobEnv;
+
+ /**
+ * Construct a runner from the provided options.
+ *
+ * @param options Properties which configure the runner.
+ * @return The newly created runner.
+ */
+ public static FlinkPipelineRunner fromOptions(PipelineOptions options) {
+ FlinkPipelineOptions flinkOptions =
+ PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+ ArrayList missing = new ArrayList<>();
+
+ if (flinkOptions.getAppName() == null) {
+ missing.add("appName");
+ }
+ if (missing.size() > 0) {
+ throw new IllegalArgumentException(
+ "Missing required values: " + Joiner.on(',').join(missing));
+ }
+
+ if (flinkOptions.getFilesToStage() == null) {
+ flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
+ DataflowPipelineRunner.class.getClassLoader()));
+ LOG.info("PipelineOptions.filesToStage was not specified. "
+ + "Defaulting to files from the classpath: will stage {} files. "
+ + "Enable logging at DEBUG level to see which files will be staged.",
+ flinkOptions.getFilesToStage().size());
+ LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
+ }
+
+ // Verify jobName according to service requirements.
+ String jobName = flinkOptions.getJobName().toLowerCase();
+ Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; " +
+ "the name must consist of only the characters " + "[-a-z0-9], starting with a letter " +
+ "and ending with a letter " + "or number");
+ Preconditions.checkArgument(jobName.length() <= 40,
+ "JobName too long; must be no more than 40 characters in length");
+
+ // Set Flink Master to [auto] if no option was specified.
+ if (flinkOptions.getFlinkMaster() == null) {
+ flinkOptions.setFlinkMaster("[auto]");
+ }
+
+ return new FlinkPipelineRunner(flinkOptions);
+ }
+
+ private FlinkPipelineRunner(FlinkPipelineOptions options) {
+ this.options = options;
+ this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options);
+ }
+
+ @Override
+ public FlinkRunnerResult run(Pipeline pipeline) {
+ LOG.info("Executing pipeline using FlinkPipelineRunner.");
+
+ LOG.info("Translating pipeline to Flink program.");
+
+ this.flinkJobEnv.translate(pipeline);
+
+ LOG.info("Starting execution of Flink program.");
+
+ JobExecutionResult result;
+ try {
+ result = this.flinkJobEnv.executePipeline();
+ } catch (Exception e) {
+ LOG.error("Pipeline execution failed", e);
+ throw new RuntimeException("Pipeline execution failed", e);
+ }
+
+ LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+
+ Map accumulators = result.getAllAccumulatorResults();
+ if (accumulators != null && !accumulators.isEmpty()) {
+ LOG.info("Final aggregator values:");
+
+ for (Map.Entry entry : result.getAllAccumulatorResults().entrySet()) {
+ LOG.info("{} : {}", entry.getKey(), entry.getValue());
+ }
+ }
+
+ return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+ }
+
+ /**
+ * For testing.
+ */
+ public FlinkPipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ /**
+ * Constructs a runner with default properties for testing.
+ *
+ * @return The newly created runner.
+ */
+ public static FlinkPipelineRunner createForTest(boolean streaming) {
+ FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ // we use [auto] for testing since this will make it pick up the Testing
+ // ExecutionEnvironment
+ options.setFlinkMaster("[auto]");
+ options.setStreaming(streaming);
+ return new FlinkPipelineRunner(options);
+ }
+
+ @Override
+ public