Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions runners/flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,35 +109,40 @@ Next, let's run the classic WordCount example. It's semantically identically to
the example provided with Apache Beam. Only this time, we chose the
`FlinkRunner` to execute the WordCount on top of Flink.

Here's an excerpt from the WordCount class file:
Here's an excerpt from the [WordCount class file](examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java):

```java
Options options = PipelineOptionsFactory.fromArgs(args).as(Options.class);
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

// yes, we want to run WordCount with Flink
options.setRunner(FlinkRunner.class);

Pipeline p = Pipeline.create(options);

p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
.apply(new CountWords())
.apply(TextIO.Write.named("WriteCounts")
.to(options.getOutput())
.withNumShards(options.getNumShards()));
p.apply("ReadLines", TextIO.Read.from(options.getInput()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));

p.run();
```

To execute the example, let's first get some sample data:

curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > examples/kinglear.txt
cd runners/flink/examples
curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > kinglear.txt

Then let's run the included WordCount locally on your machine:

cd examples
mvn exec:exec -Dinput=kinglear.txt -Doutput=wordcounts.txt
cd runners/flink/examples
mvn exec:java -Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount \
-Dinput=kinglear.txt -Doutput=wordcounts.txt

Congratulations, you have run your first Apache Beam program on top of Apache Flink!

Note, that you will find a number of `wordcounts*` output files because Flink parallelizes the
WordCount computation. You may pass an additional `-Dparallelism=1` to disable parallelization and
get a single `wordcounts.txt` file.

# Running Beam programs on a Flink cluster

Expand Down
11 changes: 4 additions & 7 deletions runners/flink/examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@
<packaging>jar</packaging>

<properties>
<!-- Default parameters for mvn exec:exec -->
<clazz>org.apache.beam.runners.flink.examples.WordCount</clazz>
<!-- Default parameters for mvn exec:java -->
<input>kinglear.txt</input>
<output>wordcounts.txt</output>
<parallelism>1</parallelism>
<parallelism>-1</parallelism>
</properties>

<profiles>
Expand Down Expand Up @@ -131,12 +130,10 @@
<configuration>
<executable>java</executable>
<arguments>
<argument>-classpath</argument>
<classpath />
<argument>${clazz}</argument>
<argument>--runner=org.apache.beam.runners.flink.FlinkRunner</argument>
<argument>--parallelism=${parallelism}</argument>
<argument>--input=${input}</argument>
<argument>--output=${output}</argument>
<argument>--parallelism=${parallelism}</argument>
</arguments>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -92,11 +92,11 @@ public String apply(KV<String, Long> input) {
*/
public interface Options extends PipelineOptions, FlinkPipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInput();
void setInput(String value);

@Description("Path of the file to write to")
@Validation.Required
String getOutput();
void setOutput(String value);
}
Expand Down