Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions examples/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,26 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,41 @@
* limitations under the License.
*/

package org.apache.beam.runners.spark;
package org.apache.beam.examples;

import org.apache.beam.examples.complete.TfIdf;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringDelegateCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.net.URI;
import java.util.Arrays;

/**
* A test based on {@code TfIdf} from the SDK.
* End-to-end tests of {@link TfIdf}.
*/
public class TfIdfTest {
@RunWith(JUnit4.class)
public class TfIdfIT {

@Test
public void testTfIdf() throws Exception {
SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class);
opts.setRunner(SparkRunner.class);
Pipeline pipeline = Pipeline.create(opts);
public void testE2ETfIdfSpark() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use PipelineOptionsFactory.fromArgs to be runner agnostic ? Is there a benefit in that ? maybe apply the same to all runners ?

options.setRunner(SparkRunner.class);
Pipeline pipeline = TestPipeline.create(options);

pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));

Expand All @@ -63,5 +70,4 @@ public void testTfIdf() throws Exception {
EvaluationResult res = SparkRunner.create().run(pipeline);
res.close();
}

}
11 changes: 0 additions & 11 deletions runners/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-java</artifactId>
<exclusions>
<!-- Use Hadoop/Spark's backend logger instead of jdk14 for tests -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import org.apache.beam.examples.WordCount;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import com.google.common.base.Charsets;
Expand All @@ -55,8 +57,7 @@
public class NumShardsTest {

private static final String[] WORDS_ARRAY = {
"hi there", "hi", "hi sue bob",
"hi sue", "", "bob hi"};
"hi", "there", "hi", "hi", "sue", "bob", "hi", "sue", "", "bob", "hi"};
private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

private File outputDir;
Expand All @@ -70,14 +71,22 @@ public void setUp() throws IOException {
outputDir.delete();
}

/** A SimpleFunction that converts a Word and Count into a printable string. */
private static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}

@Test
public void testText() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
PCollection<String> output = inputWords.apply(Count.<String>perElement())
.apply(MapElements.via(new FormatAsTextFn()));
output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
EvaluationResult res = SparkRunner.create().run(p);
res.close();
Expand All @@ -97,5 +106,4 @@ public void testText() throws Exception {
assertEquals(3, count);
assertTrue(expected.isEmpty());
}

}