diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 3b67797431c9..57acec3f4787 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -270,12 +270,26 @@
runtime
+
+ org.apache.beam
+ beam-runners-flink_2.10
+ ${project.version}
+ runtime
+
+
org.apache.beam
beam-runners-google-cloud-dataflow-java
${project.version}
+
+ org.apache.beam
+ beam-runners-spark
+ ${project.version}
+ runtime
+
+
org.slf4j
slf4j-jdk14
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/TfIdfIT.java
similarity index 76%
rename from runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
rename to examples/java/src/test/java/org/apache/beam/examples/TfIdfIT.java
index df78338d4269..45b1ca23e10b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/TfIdfIT.java
@@ -16,13 +16,17 @@
* limitations under the License.
*/
-package org.apache.beam.runners.spark;
+package org.apache.beam.examples;
import org.apache.beam.examples.complete.TfIdf;
+import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringDelegateCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -30,20 +34,23 @@
import org.apache.beam.sdk.values.PCollection;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
import java.net.URI;
import java.util.Arrays;
/**
- * A test based on {@code TfIdf} from the SDK.
+ * End-to-end tests of {@link TfIdf}.
*/
-public class TfIdfTest {
+@RunWith(JUnit4.class)
+public class TfIdfIT {
@Test
- public void testTfIdf() throws Exception {
- SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- opts.setRunner(SparkRunner.class);
- Pipeline pipeline = Pipeline.create(opts);
+ public void testE2ETfIdfSpark() throws Exception {
+ SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ options.setRunner(SparkRunner.class);
+ Pipeline pipeline = TestPipeline.create(options);
pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
@@ -63,5 +70,4 @@ public void testTfIdf() throws Exception {
EvaluationResult res = SparkRunner.create().run(pipeline);
res.close();
}
-
}
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 94c42bd663f6..66215deb1355 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -221,17 +221,6 @@
-
- org.apache.beam
- beam-examples-java
-
-
-
- org.slf4j
- slf4j-jdk14
-
-
-
org.apache.avro
avro-mapred
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index b4268d6127c1..dbee4ac1abef 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -21,7 +21,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import org.apache.beam.examples.WordCount;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
@@ -29,8 +28,11 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.base.Charsets;
@@ -55,8 +57,7 @@
public class NumShardsTest {
private static final String[] WORDS_ARRAY = {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
+ "hi", "there", "hi", "hi", "sue", "bob", "hi", "sue", "", "bob", "hi"};
private static final List WORDS = Arrays.asList(WORDS_ARRAY);
private File outputDir;
@@ -70,14 +71,22 @@ public void setUp() throws IOException {
outputDir.delete();
}
+ /** A SimpleFunction that converts a Word and Count into a printable string. */
+ private static class FormatAsTextFn extends SimpleFunction, String> {
+ @Override
+ public String apply(KV input) {
+ return input.getKey() + ": " + input.getValue();
+ }
+ }
+
@Test
public void testText() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
- PCollection output = inputWords.apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+ PCollection output = inputWords.apply(Count.perElement())
+ .apply(MapElements.via(new FormatAsTextFn()));
output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
EvaluationResult res = SparkRunner.create().run(p);
res.close();
@@ -97,5 +106,4 @@ public void testText() throws Exception {
assertEquals(3, count);
assertTrue(expected.isEmpty());
}
-
}