input) {
+ return input.getKey() + ": " + input.getValue();
+ }
+ }
+
+ /**
+ * A PTransform that converts a PCollection containing lines of text into a PCollection of
+ * formatted word counts.
+ *
+ * Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
+ * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
+ * modular testing, and an improved monitoring experience.
+ */
+ public static class CountWords extends PTransform,
+ PCollection>> {
+ @Override
+ public PCollection> apply(PCollection lines) {
+
+ // Convert lines of text into individual words.
+ PCollection words = lines.apply(
+ ParDo.of(new ExtractWordsFn()));
+
+ // Count the number of times each word occurs.
+ PCollection> wordCounts =
+ words.apply(Count.perElement());
+
+ return wordCounts;
+ }
+ }
+
@Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
@@ -76,8 +143,8 @@ public void testText() throws Exception {
options.setRunner(SparkPipelineRunner.class);
Pipeline p = Pipeline.create(options);
PCollection inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
- PCollection output = inputWords.apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+ PCollection output = inputWords.apply(new CountWords())
+ .apply(MapElements.via(new FormatAsTextFn()));
output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
EvaluationResult res = SparkPipelineRunner.create().run(p);
res.close();