Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions runners/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-java</artifactId>
<exclusions>
<!-- Use Hadoop/Spark's backend logger instead of jdk14 for tests -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.examples;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/**
* Duplicated to avoid dependency on beam-examples.
*/
public class WordCount {

/**
* Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out-
* of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
* pipeline.
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());

@Override
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
}

// Split the line into words.
String[] words = c.element().split("[^a-zA-Z']+");

// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}

/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}

/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));

// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());

return wordCounts;
}
}

/**
* Options supported by {@link WordCount}.
*
* <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments
* to be processed by the command-line parser, and specify default values for them. You can then
* access the options values in your pipeline code.
*
* <p>Inherits standard configuration options.
*/
public static interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);

@Description("Path of the file to write to")
String getOutput();
void setOutput(String value);
}

public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);

// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
//TODO: remove withoutValidation once possible
p.apply("ReadLines", TextIO.Read.from(options.getInputFile()).withoutValidation())
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));

p.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,14 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;

import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;

import com.google.common.collect.ImmutableSet;
Expand All @@ -48,7 +43,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;

/**
* Simple word count test.
Expand All @@ -68,7 +62,8 @@ public void testInMem() throws Exception {
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
.of()));
PCollection<String> output = inputWords.apply(new CountWords());
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));

PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);

Expand All @@ -86,7 +81,8 @@ public void testOutputFile() throws Exception {
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
.of()));
PCollection<String> output = inputWords.apply(new CountWords());
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));

File outputFile = testFolder.newFile();
output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
Expand All @@ -97,64 +93,4 @@ public void testOutputFile() throws Exception {
assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)),
containsInAnyOrder(EXPECTED_COUNT_SET.toArray()));
}

/**
* A DoFn that tokenizes lines of text into individual words.
*/
static class ExtractWordsFn extends DoFn<String, String> {
private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+");
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());

@Override
public void processElement(ProcessContext c) {
// Split the line into words.
String[] words = WORD_BOUNDARY.split(c.element());

// Keep track of the number of lines without any words encountered while tokenizing.
// This aggregator is visible in the monitoring UI when run using DataflowRunner.
if (words.length == 0) {
emptyLines.addValue(1L);
}

// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}

/**
* A DoFn that converts a Word and Count into a printable string.
*/
private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ": " + c.element().getValue());
}
}

/**
* A {@link PTransform} counting words.
*/
public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> {
@Override
public PCollection<String> apply(PCollection<String> lines) {

// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));

// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());

// Format each word and count into a printable string.

return wordCounts.apply(ParDo.of(new FormatCountsFn()));
}

}
}
Loading