diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index b1929d217068..878f573128c1 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -72,6 +72,7 @@
+
org.apache.flink
flink-core
@@ -82,13 +83,6 @@
flink-streaming-java_${scala.major.version}
${flink.version}
-
- org.apache.flink
- flink-streaming-java_${scala.major.version}
- ${flink.version}
- test
- test-jar
-
org.apache.flink
flink-java
@@ -99,12 +93,6 @@
flink-clients_${scala.major.version}
${flink.version}
-
- org.apache.flink
- flink-test-utils_${scala.major.version}
- ${flink.version}
- test
-
org.apache.flink
flink-connector-kafka-0.8_${scala.major.version}
@@ -126,12 +114,35 @@
+
+
+
+ com.google.auto.service
+ auto-service
+ 1.0-rc2
+ true
+
+
+
org.mockito
mockito-all
1.9.5
test
+
+ org.apache.flink
+ flink-streaming-java_${scala.major.version}
+ ${flink.version}
+ test
+ test-jar
+
+
+ org.apache.flink
+ flink-test-utils_${scala.major.version}
+ ${flink.version}
+ test
+
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
new file mode 100644
index 000000000000..69904a4655c9
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
@@ -0,0 +1,38 @@
+package org.apache.beam.runners.flink;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
+import com.google.common.collect.ImmutableList;
+
+
+/**
+ * AuteService registrar - will register FlinkRunner and FlinkOptions
+ * as possible pipeline runner services.
+ *
+ * It ends up in META-INF/services and gets picked up by Dataflow.
+ *
+ */
+public class FlinkRunnerRegistrar {
+ private FlinkRunnerRegistrar() { }
+
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable>> getPipelineRunners() {
+ return ImmutableList.>>of(FlinkPipelineRunner.class);
+ }
+ }
+
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable> getPipelineOptions() {
+ return ImmutableList.>of(FlinkPipelineOptions.class);
+ }
+ }
+}