From d09ee4c8847f98c7c6304234f3d25c4e3d642313 Mon Sep 17 00:00:00 2001 From: Jason Kuster Date: Mon, 16 May 2016 18:57:45 -0700 Subject: [PATCH] Remove Flink, Spark dependencies on Beam examples and add Flink and Spark as supported runners in examples. Signed-off-by: Jason Kuster --- examples/java/pom.xml | 24 ++ runners/flink/runner/pom.xml | 11 - .../beam/runners/flink/TfIdfITCase.java | 80 ------- .../beam/runners/flink/WordCountITCase.java | 77 ------- runners/spark/pom.xml | 11 - .../apache/beam/runners/spark/TfIdfTest.java | 205 +++++++++++++++++- .../beam/runners/spark/io/NumShardsTest.java | 73 ++++++- 7 files changed, 297 insertions(+), 184 deletions(-) delete mode 100644 runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java delete mode 100644 runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java diff --git a/examples/java/pom.xml b/examples/java/pom.xml index e50b94a12602..b6027dad9b50 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -219,6 +219,30 @@ ${project.version} + + org.apache.beam + flink-runner_2.10 + ${project.version} + + + + org.apache.spark + spark-core_2.10 + 1.6.1 + + + + org.apache.spark + spark-streaming_2.10 + 1.6.1 + + + + org.apache.beam + spark-runner + ${project.version} + + com.google.api-client google-api-client diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index a53a386c2828..1a74b7105e06 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -111,17 +111,6 @@ - - org.apache.beam - java-examples-all - - - org.slf4j - slf4j-jdk14 - - - test - org.apache.flink flink-streaming-java_2.10 diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java deleted file mode 100644 index 547f3c3a4660..000000000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.examples.complete.TfIdf; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringDelegateCoder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.RemoveDuplicates; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.net.URI; - - -public class TfIdfITCase extends JavaProgramTestBase { - - protected String resultPath; - - public TfIdfITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "a", "m", "n", "b", "c", "d"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline pipeline = FlinkTestPipeline.createForBatch(); - - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - PCollection>> wordToUriAndTfIdf = pipeline - .apply(Create.of( - KV.of(new URI("x"), "a b c d"), - KV.of(new URI("y"), "a b c"), - KV.of(new URI("z"), "a m n"))) - .apply(new TfIdf.ComputeTfIdf()); - - PCollection words = wordToUriAndTfIdf - .apply(Keys.create()) - .apply(RemoveDuplicates.create()); - - words.apply(TextIO.Write.to(resultPath)); - - pipeline.run(); - } -} - diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java deleted file mode 100644 index 3254e7885db8..000000000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.examples.WordCount; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.Arrays; -import java.util.List; - - -public class WordCountITCase extends JavaProgramTestBase { - - protected String resultPath; - - public WordCountITCase(){ - } - - static final String[] WORDS_ARRAY = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final List WORDS = Arrays.asList(WORDS_ARRAY); - - static final String[] COUNTS_ARRAY = new String[] { - "hi: 5", "there: 1", "sue: 2", "bob: 2"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); - - input - .apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())) - .apply(TextIO.Write.to(resultPath)); - - p.run(); - } -} - diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 5daf1e17e5e4..337625c8e4a2 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -84,17 +84,6 @@ - - org.apache.beam - java-examples-all - - - - org.slf4j - slf4j-jdk14 - - - org.apache.avro avro-mapred diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java index 00c4657f5f16..05c9c750b982 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java @@ -18,18 +18,30 @@ package org.apache.beam.runners.spark; -import org.apache.beam.examples.complete.TfIdf; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringDelegateCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Keys; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.URI; import java.util.Arrays; @@ -52,7 +64,7 @@ public void testTfIdf() throws Exception { KV.of(new URI("x"), "a b c d"), KV.of(new URI("y"), "a b c"), KV.of(new URI("z"), "a m n"))) - .apply(new TfIdf.ComputeTfIdf()); + .apply(new ComputeTfIdf()); PCollection words = wordToUriAndTfIdf .apply(Keys.create()) @@ -64,4 +76,193 @@ public void testTfIdf() throws Exception { res.close(); } + /** + * A transform containing a basic TF-IDF pipeline. The input consists of KV objects + * where the key is the document's URI and the value is a piece + * of the document's content. The output is mapping from terms to + * scores for each document URI. + */ + public static class ComputeTfIdf + extends PTransform>, PCollection>>> { + public ComputeTfIdf() { } + + @Override + public PCollection>> apply( + PCollection> uriToContent) { + + // Compute the total number of documents, and + // prepare this singleton PCollectionView for + // use as a side input. + final PCollectionView totalDocuments = + uriToContent + .apply("GetURIs", Keys.create()) + .apply("RemoveDuplicateDocs", RemoveDuplicates.create()) + .apply(Count.globally()) + .apply(View.asSingleton()); + + // Create a collection of pairs mapping a URI to each + // of the words in the document associated with that that URI. + PCollection> uriToWords = uriToContent + .apply(ParDo.named("SplitWords").of( + new DoFn, KV>() { + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey(); + String line = c.element().getValue(); + for (String word : line.split("\\W+")) { + // Log INFO messages when the word “love” is found. + if (word.toLowerCase().equals("love")) { + LOG.info("Found {}", word.toLowerCase()); + } + + if (!word.isEmpty()) { + c.output(KV.of(uri, word.toLowerCase())); + } + } + } + })); + + // Compute a mapping from each word to the total + // number of documents in which it appears. + PCollection> wordToDocCount = uriToWords + .apply("RemoveDuplicateWords", RemoveDuplicates.>create()) + .apply(Values.create()) + .apply("CountDocs", Count.perElement()); + + // Compute a mapping from each URI to the total + // number of words in the document associated with that URI. + PCollection> uriToWordTotal = uriToWords + .apply("GetURIs2", Keys.create()) + .apply("CountWords", Count.perElement()); + + // Count, for each (URI, word) pair, the number of + // occurrences of that word in the document associated + // with the URI. + PCollection, Long>> uriAndWordToCount = uriToWords + .apply("CountWordDocPairs", Count.>perElement()); + + // Adjust the above collection to a mapping from + // (URI, word) pairs to counts into an isomorphic mapping + // from URI to (word, count) pairs, to prepare for a join + // by the URI key. + PCollection>> uriToWordAndCount = uriAndWordToCount + .apply(ParDo.named("ShiftKeys").of( + new DoFn, Long>, KV>>() { + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey().getKey(); + String word = c.element().getKey().getValue(); + Long occurrences = c.element().getValue(); + c.output(KV.of(uri, KV.of(word, occurrences))); + } + })); + + // Prepare to join the mapping of URI to (word, count) pairs with + // the mapping of URI to total word counts, by associating + // each of the input PCollection> with + // a tuple tag. Each input must have the same key type, URI + // in this case. The type parameter of the tuple tag matches + // the types of the values for each collection. + final TupleTag wordTotalsTag = new TupleTag(); + final TupleTag> wordCountsTag = new TupleTag>(); + KeyedPCollectionTuple coGbkInput = KeyedPCollectionTuple + .of(wordTotalsTag, uriToWordTotal) + .and(wordCountsTag, uriToWordAndCount); + + // Perform a CoGroupByKey (a sort of pre-join) on the prepared + // inputs. This yields a mapping from URI to a CoGbkResult + // (CoGroupByKey Result). The CoGbkResult is a mapping + // from the above tuple tags to the values in each input + // associated with a particular URI. In this case, each + // KV group a URI with the total number of + // words in that document as well as all the (word, count) + // pairs for particular words. + PCollection> uriToWordAndCountAndTotal = coGbkInput + .apply("CoGroupByUri", CoGroupByKey.create()); + + // Compute a mapping from each word to a (URI, term frequency) + // pair for each URI. A word's term frequency for a document + // is simply the number of times that word occurs in the document + // divided by the total number of words in the document. + PCollection>> wordToUriAndTf = uriToWordAndCountAndTotal + .apply(ParDo.named("ComputeTermFrequencies").of( + new DoFn, KV>>() { + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey(); + Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); + + for (KV wordAndCount + : c.element().getValue().getAll(wordCountsTag)) { + String word = wordAndCount.getKey(); + Long wordCount = wordAndCount.getValue(); + Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue(); + c.output(KV.of(word, KV.of(uri, termFrequency))); + } + } + })); + + // Compute a mapping from each word to its document frequency. + // A word's document frequency in a corpus is the number of + // documents in which the word appears divided by the total + // number of documents in the corpus. Note how the total number of + // documents is passed as a side input; the same value is + // presented to each invocation of the DoFn. + PCollection> wordToDf = wordToDocCount + .apply(ParDo + .named("ComputeDocFrequencies") + .withSideInputs(totalDocuments) + .of(new DoFn, KV>() { + @Override + public void processElement(ProcessContext c) { + String word = c.element().getKey(); + Long documentCount = c.element().getValue(); + Long documentTotal = c.sideInput(totalDocuments); + Double documentFrequency = documentCount.doubleValue() + / documentTotal.doubleValue(); + + c.output(KV.of(word, documentFrequency)); + } + })); + + // Join the term frequency and document frequency + // collections, each keyed on the word. + final TupleTag> tfTag = new TupleTag>(); + final TupleTag dfTag = new TupleTag(); + PCollection> wordToUriAndTfAndDf = KeyedPCollectionTuple + .of(tfTag, wordToUriAndTf) + .and(dfTag, wordToDf) + .apply(CoGroupByKey.create()); + + // Compute a mapping from each word to a (URI, TF-IDF) score + // for each URI. There are a variety of definitions of TF-IDF + // ("term frequency - inverse document frequency") score; + // here we use a basic version that is the term frequency + // divided by the log of the document frequency. + PCollection>> wordToUriAndTfIdf = wordToUriAndTfAndDf + .apply(ParDo.named("ComputeTfIdf").of( + new DoFn, KV>>() { + @Override + public void processElement(ProcessContext c) { + String word = c.element().getKey(); + Double df = c.element().getValue().getOnly(dfTag); + + for (KV uriAndTf : c.element().getValue().getAll(tfTag)) { + URI uri = uriAndTf.getKey(); + Double tf = uriAndTf.getValue(); + Double tfIdf = tf * Math.log(1 / df); + c.output(KV.of(word, KV.of(uri, tfIdf))); + } + } + })); + + return wordToUriAndTfIdf; + } + + // Instantiate Logger. + // It is suggested that the user specify the class name of the containing class + // (in this case ComputeTfIdf). + private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class); + } + } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 8ce35c446b4c..7eb77b83ea42 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import org.apache.beam.examples.WordCount; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineRunner; @@ -29,8 +28,16 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import com.google.common.base.Charsets; @@ -61,6 +68,66 @@ public class NumShardsTest { private File outputDir; + /** + * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out- + * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the + * pipeline. + */ + static class ExtractWordsFn extends DoFn { + private final Aggregator emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + /** A SimpleFunction that converts a Word and Count into a printable string. */ + public static class FormatAsTextFn extends SimpleFunction, String> { + @Override + public String apply(KV input) { + return input.getKey() + ": " + input.getValue(); + } + } + + /** + * A PTransform that converts a PCollection containing lines of text into a PCollection of + * formatted word counts. + * + *

Concept #3: This is a custom composite transform that bundles two transforms (ParDo and + * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse, + * modular testing, and an improved monitoring experience. + */ + public static class CountWords extends PTransform, + PCollection>> { + @Override + public PCollection> apply(PCollection lines) { + + // Convert lines of text into individual words. + PCollection words = lines.apply( + ParDo.of(new ExtractWordsFn())); + + // Count the number of times each word occurs. + PCollection> wordCounts = + words.apply(Count.perElement()); + + return wordCounts; + } + } + @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); @@ -76,8 +143,8 @@ public void testText() throws Exception { options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); - PCollection output = inputWords.apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())); + PCollection output = inputWords.apply(new CountWords()) + .apply(MapElements.via(new FormatAsTextFn())); output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt")); EvaluationResult res = SparkPipelineRunner.create().run(p); res.close();