diff --git a/runners/flink/README.md b/runners/flink/README.md index 334811901a68..aeb16922a96b 100644 --- a/runners/flink/README.md +++ b/runners/flink/README.md @@ -109,35 +109,40 @@ Next, let's run the classic WordCount example. It's semantically identically to the example provided with Apache Beam. Only this time, we chose the `FlinkRunner` to execute the WordCount on top of Flink. -Here's an excerpt from the WordCount class file: +Here's an excerpt from the [WordCount class file](examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java): ```java -Options options = PipelineOptionsFactory.fromArgs(args).as(Options.class); +Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + // yes, we want to run WordCount with Flink options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); -p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) - .apply(new CountWords()) - .apply(TextIO.Write.named("WriteCounts") - .to(options.getOutput()) - .withNumShards(options.getNumShards())); +p.apply("ReadLines", TextIO.Read.from(options.getInput())) + .apply(new CountWords()) + .apply(MapElements.via(new FormatAsTextFn())) + .apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run(); ``` To execute the example, let's first get some sample data: - curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > examples/kinglear.txt + cd runners/flink/examples + curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > kinglear.txt Then let's run the included WordCount locally on your machine: - cd examples - mvn exec:exec -Dinput=kinglear.txt -Doutput=wordcounts.txt + cd runners/flink/examples + mvn exec:java -Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount \ + -Dinput=kinglear.txt -Doutput=wordcounts.txt Congratulations, you have run your first Apache Beam program on top of Apache Flink! +Note, that you will find a number of `wordcounts*` output files because Flink parallelizes the +WordCount computation. You may pass an additional `-Dparallelism=1` to disable parallelization and +get a single `wordcounts.txt` file. # Running Beam programs on a Flink cluster diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml index b0ee2ed04e68..355a6bed88aa 100644 --- a/runners/flink/examples/pom.xml +++ b/runners/flink/examples/pom.xml @@ -33,11 +33,10 @@ jar - - org.apache.beam.runners.flink.examples.WordCount + kinglear.txt wordcounts.txt - 1 + -1 @@ -131,12 +130,10 @@ java - -classpath - - ${clazz} + --runner=org.apache.beam.runners.flink.FlinkRunner + --parallelism=${parallelism} --input=${input} --output=${output} - --parallelism=${parallelism} diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java index 2d95c978b3ff..c54229d37aa3 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java @@ -21,10 +21,10 @@ import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; @@ -92,11 +92,11 @@ public String apply(KV input) { */ public interface Options extends PipelineOptions, FlinkPipelineOptions { @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInput(); void setInput(String value); @Description("Path of the file to write to") + @Validation.Required String getOutput(); void setOutput(String value); }