Run {@link injector.Injector} to generate pubsub data for this pipeline. The Injector
- * documentation provides more detail.
+ *
Run {@literal com.google.cloud.dataflow.examples.complete.game.injector.Injector} to generate
+ * pubsub data for this pipeline. The {@literal Injector} documentation provides more detail.
*
*
To execute this pipeline using the Dataflow service, specify the pipeline configuration
* like this:
@@ -95,14 +94,10 @@
public class GameStats extends LeaderBoard {
private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
- private static final Logger LOG = LoggerFactory.getLogger(GameStats.class);
private static DateTimeFormatter fmt =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
- static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
- static final Duration TEN_MINUTES = Duration.standardMinutes(10);
-
/**
* Filter out all but those users with a high clickrate, which we will consider as 'spammy' uesrs.
@@ -172,11 +167,6 @@ public void processElement(ProcessContext c) {
* Options supported by {@link GameStats}.
*/
static interface Options extends LeaderBoard.Options {
- @Description("Pub/Sub topic to read from")
- @Validation.Required
- String getTopic();
- void setTopic(String value);
-
@Description("Numeric value of fixed window duration for user analysis, in minutes")
@Default.Integer(60)
Integer getFixedWindowDuration();
@@ -299,7 +289,9 @@ public void processElement(ProcessContext c) {
// If the user is not in the spammers Map, output the data element.
if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
c.output(c.element());
- }}}))
+ }
+ }
+ }))
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"))
// [END DocInclude_FilterAndCalc]
diff --git a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java
similarity index 100%
rename from examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java
rename to java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java
diff --git a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java
similarity index 100%
rename from examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java
rename to java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java
diff --git a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/README.md b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
similarity index 99%
rename from examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/README.md
rename to java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
index 4cad16d5f59f..79b55cee112a 100644
--- a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/README.md
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/README.md
@@ -111,9 +111,3 @@ you can override when you start up the pipeline if those tables already exist.
Depending on the windowing intervals defined in a given pipeline, you may have
to wait for a while (more than an hour) before you start to see results written
to the BigQuery tables.
-
-
-
-
-
-
diff --git a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/UserScore.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
similarity index 100%
rename from examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/UserScore.java
rename to java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/UserScore.java
diff --git a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
similarity index 97%
rename from examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
rename to java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
index d47886db43ab..1691c548d419 100644
--- a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
@@ -19,17 +19,17 @@
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.PublishRequest;
import com.google.api.services.pubsub.model.PubsubMessage;
-
import com.google.common.collect.ImmutableMap;
+
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
+
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
-import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -224,7 +224,6 @@ private static synchronized TeamInfo addLiveTeam() {
if (random.nextInt(ROBOT_PROBABILITY) == 0) {
robot = "Robot-" + random.nextInt(NUM_ROBOTS);
}
- long currTime = System.currentTimeMillis();
// Create the new team.
TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot);
liveTeams.add(newTeam);
@@ -245,7 +244,7 @@ private static String generateEvent(Long currTime, int delayInMillis) {
TeamInfo team = randomTeam(liveTeams);
String teamName = team.getTeamName();
String user;
- int PARSE_ERROR_RATE = 900000;
+ final int parseErrorRate = 900000;
String robot = team.getRobot();
// If the team has an associated robot team member...
@@ -264,7 +263,7 @@ private static String generateEvent(Long currTime, int delayInMillis) {
String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE);
// Randomly introduce occasional parse errors. You can see a custom counter tracking the number
// of such errors in the Dataflow Monitoring UI, as the example pipeline runs.
- if (random.nextInt(PARSE_ERROR_RATE) == 0) {
+ if (random.nextInt(parseErrorRate) == 0) {
System.out.println("Introducing a parse error.");
event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR";
}
@@ -316,7 +315,6 @@ public static void publishData(int numMessages, int delayInMillis)
*/
public static void publishDataToFile(String fileName, int numMessages, int delayInMillis)
throws IOException {
- List pubsubMessages = new ArrayList<>();
PrintWriter out = new PrintWriter(new OutputStreamWriter(
new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8"));
@@ -337,8 +335,7 @@ public static void publishDataToFile(String fileName, int numMessages, int delay
}
- public static void main(String[] args)
- throws GeneralSecurityException, IOException, InterruptedException {
+ public static void main(String[] args) throws IOException, InterruptedException {
if (args.length < 3) {
System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)");
System.exit(1);
@@ -400,6 +397,7 @@ public static void main(String[] args)
} else { // Write to PubSub.
// Start a thread to inject some data.
new Thread(){
+ @Override
public void run() {
try {
publishData(numMessages, delayInMillis);
diff --git a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
similarity index 100%
rename from examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
rename to java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
diff --git a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
similarity index 99%
rename from examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
rename to java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
index eeeabcef8beb..14375342274a 100644
--- a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
+++ b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/injector/RetryHttpInitializerWrapper.java
@@ -124,4 +124,3 @@ public boolean handleResponse(
.setSleeper(sleeper));
}
}
-
diff --git a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
similarity index 100%
rename from examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
rename to java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteToBigQuery.java
diff --git a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java b/java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
similarity index 100%
rename from examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
rename to java8examples/src/main/java/com/google/cloud/dataflow/examples/complete/game/utils/WriteWindowedToBigQuery.java
diff --git a/examples/src/test/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java
similarity index 100%
rename from examples/src/test/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java
rename to java8examples/src/test/java/com/google/cloud/dataflow/examples/MinimalWordCountJava8Test.java
diff --git a/examples/src/test/java8/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
similarity index 69%
rename from examples/src/test/java8/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
rename to java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
index 4795de2fc32d..f77d1461fa99 100644
--- a/examples/src/test/java8/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java
@@ -16,33 +16,15 @@
package com.google.cloud.dataflow.examples.complete.game;
-import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.examples.complete.game.GameStats.CalculateSpammyUsers;
-import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo;
-import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn;
import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
-import com.google.cloud.dataflow.sdk.transforms.Filter;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.WithTimestamps;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import org.hamcrest.CoreMatchers;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -62,7 +44,7 @@
public class GameStatsTest implements Serializable {
// User scores
- static final KV[] USER_SCORES_ARRAY = new KV[] {
+ static final List> USER_SCORES = Arrays.asList(
KV.of("Robot-2", 66), KV.of("Robot-1", 116), KV.of("user7_AndroidGreenKookaburra", 23),
KV.of("user7_AndroidGreenKookaburra", 1),
KV.of("user19_BisqueBilby", 14), KV.of("user13_ApricotQuokka", 15),
@@ -70,16 +52,11 @@ public class GameStatsTest implements Serializable {
KV.of("user2_AmberQuokka", 6), KV.of("user0_MagentaKangaroo", 4),
KV.of("user0_MagentaKangaroo", 3), KV.of("user2_AmberCockatoo", 13),
KV.of("user7_AlmondWallaby", 15), KV.of("user6_AmberNumbat", 11),
- KV.of("user6_AmberQuokka", 4)
- };
-
- static final List> USER_SCORES = Arrays.asList(USER_SCORES_ARRAY);
+ KV.of("user6_AmberQuokka", 4));
// The expected list of 'spammers'.
- static final KV[] SPAMMERS = new KV[] {
- KV.of("Robot-2", 66), KV.of("Robot-1", 116)
- };
-
+ static final List> SPAMMERS = Arrays.asList(
+ KV.of("Robot-2", 66), KV.of("Robot-1", 116));
/** Test the calculation of 'spammy users'. */
@Test
diff --git a/examples/src/test/java8/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
similarity index 89%
rename from examples/src/test/java8/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
rename to java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
index fe163037f63c..f77a5d4488fc 100644
--- a/examples/src/test/java8/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java
@@ -16,8 +16,6 @@
package com.google.cloud.dataflow.examples.complete.game;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.examples.complete.game.UserScore.ExtractAndSumScore;
import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo;
import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn;
import com.google.cloud.dataflow.sdk.Pipeline;
@@ -26,22 +24,14 @@
import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
import com.google.cloud.dataflow.sdk.transforms.Filter;
import com.google.cloud.dataflow.sdk.transforms.MapElements;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.WithTimestamps;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import org.hamcrest.CoreMatchers;
-import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
diff --git a/examples/src/test/java8/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
similarity index 96%
rename from examples/src/test/java8/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
rename to java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
index 69601be1bb4d..641e2c3990d9 100644
--- a/examples/src/test/java8/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
+++ b/java8examples/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java
@@ -28,12 +28,10 @@
import com.google.cloud.dataflow.sdk.transforms.DoFnTester;
import com.google.cloud.dataflow.sdk.transforms.MapElements;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -72,17 +70,17 @@ public class UserScoreTest implements Serializable {
static final List GAME_EVENTS = Arrays.asList(GAME_EVENTS_ARRAY);
static final List GAME_EVENTS2 = Arrays.asList(GAME_EVENTS_ARRAY2);
- static final KV[] USER_SUMS = new KV[] {
+ static final List> USER_SUMS = Arrays.asList(
KV.of("user0_MagentaKangaroo", 3), KV.of("user13_ApricotQuokka", 15),
KV.of("user6_AmberNumbat", 11), KV.of("user7_AlmondWallaby", 15),
KV.of("user7_AndroidGreenKookaburra", 23),
- KV.of("user19_BisqueBilby", 14) };
+ KV.of("user19_BisqueBilby", 14));
- static final KV[] TEAM_SUMS = new KV[] {
+ static final List> TEAM_SUMS = Arrays.asList(
KV.of("MagentaKangaroo", 3), KV.of("ApricotQuokka", 15),
KV.of("AmberNumbat", 11), KV.of("AlmondWallaby", 15),
KV.of("AndroidGreenKookaburra", 23),
- KV.of("BisqueBilby", 14) };
+ KV.of("BisqueBilby", 14));
/** Test the ParseEventFn DoFn. */
@Test
diff --git a/java8tests/pom.xml b/java8tests/pom.xml
new file mode 100644
index 000000000000..de44ed49b7b7
--- /dev/null
+++ b/java8tests/pom.xml
@@ -0,0 +1,183 @@
+
+
+
+ 4.0.0
+
+
+ com.google.cloud.dataflow
+ google-cloud-dataflow-java-sdk-parent
+ 1.6.0-SNAPSHOT
+
+
+ com.google.cloud.dataflow
+ google-cloud-dataflow-java-java8tests-all
+ Google Cloud Dataflow Java 8 Tests - All
+ Google Cloud Dataflow Java SDK provides a simple, Java-based
+ interface for processing virtually any size data using Google cloud
+ resources. This artifact includes tests of the SDK from a Java 8
+ user.
+ http://cloud.google.com/dataflow
+
+ jar
+
+
+
+ DataflowPipelineTests
+
+ true
+ com.google.cloud.dataflow.sdk.testing.RunnableOnService
+ both
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ 1.8
+ 1.8
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ analyze-only
+
+ true
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 2.12
+
+
+ com.puppycrawl.tools
+ checkstyle
+ 6.6
+
+
+
+ ../checkstyle.xml
+ true
+ true
+ true
+ false
+
+
+
+
+ check
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 2.4
+
+
+ attach-sources
+ compile
+
+ jar
+
+
+
+ attach-test-sources
+ test-compile
+
+ test-jar
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ default-jar
+
+ jar
+
+
+
+ default-test-jar
+
+ test-jar
+
+
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+
+
+
+
+ com.google.cloud.dataflow
+ google-cloud-dataflow-java-sdk-all
+ ${project.version}
+
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+
+ joda-time
+ joda-time
+ ${joda.version}
+
+
+
+ org.hamcrest
+ hamcrest-all
+ ${hamcrest.version}
+ test
+
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+
diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
similarity index 100%
rename from sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
rename to java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java
diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
similarity index 100%
rename from sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
rename to java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java
diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
similarity index 100%
rename from sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
rename to java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java
diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
similarity index 100%
rename from sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
rename to java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java
diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
similarity index 100%
rename from sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
rename to java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java
diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
similarity index 99%
rename from sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
rename to java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
index d9e2180b7da7..dfa1ca64a5e6 100644
--- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
+++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java
@@ -96,4 +96,3 @@ public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() {
RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length()));
}
}
-
diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
similarity index 99%
rename from sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
rename to java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
index c10af2903013..3771f78b4938 100644
--- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
+++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java
@@ -71,4 +71,3 @@ public void withLambdaAndNoTypeDescriptorShouldThrow() {
p.run();
}
}
-
diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
similarity index 99%
rename from sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
rename to java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
index 50b5ff737f76..b2b6dbc0b8e5 100644
--- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
+++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java
@@ -63,4 +63,3 @@ public void processElement(DoFn>.ProcessContext c)
KV.of(yearTwoThousand, new Instant(Long.valueOf(yearTwoThousand))));
}
}
-
diff --git a/pom.xml b/pom.xml
index 6fb0b32a573e..da4c24fd437f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,24 @@
+
+ java8-tests
+
+ [1.8,)
+
+
+ java8tests
+
+
+
+ java8-examples
+
+ [1.8,)
+
+
+ java8examples
+
+ doclint-java8-disable
@@ -215,10 +233,6 @@
${testParallelValue}4
-
- ${project.build.directory}/${project.artifactId}-${project.version}.jar
- ${project.build.directory}/${project.artifactId}-${project.version}-tests.jar
- ${testGroups}${runIntegrationTestOnService}
diff --git a/sdk/pom.xml b/sdk/pom.xml
index d7e10a53a801..71f509729402 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -54,77 +54,6 @@
both
-
-
- java8tests
-
- [1.8,)
-
-
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
-
-
- add-java8-test-source
- initialize
-
- add-test-source
-
-
-
- ${project.basedir}/src/test/java8
-
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
-
- default-testCompile
- test-compile
-
- testCompile
-
-
- 1.7
- 1.7
-
-
- **/*Java8Test.java
-
-
-
-
-
- java8-testCompile
- test-compile
-
- testCompile
-
-
- 1.8
- 1.8
-
- **/*Java8Test.java
-
-
-
-
-
-
-
-
@@ -399,7 +328,7 @@
schemas
- generate-test-sources
+ generate-sourcesschema
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index 29f9b85cc529..5fbbcac4114c 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -866,12 +866,13 @@ public PCollectionView apply(PCollection input) {
input.getWindowingStrategy().getWindowFn().windowCoder();
@SuppressWarnings({"rawtypes", "unchecked"})
- PCollectionView view = PCollectionViews.singletonView(
- input.getPipeline(),
- (WindowingStrategy) input.getWindowingStrategy(),
- hasDefault,
- defaultValue,
- defaultValueCoder);
+ PCollectionView view =
+ (PCollectionView) PCollectionViews.singletonView(
+ input.getPipeline(),
+ (WindowingStrategy) input.getWindowingStrategy(),
+ hasDefault,
+ defaultValue,
+ defaultValueCoder);
IsmRecordCoder> ismCoder =
coderForSingleton(windowCoder, defaultValueCoder);
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
index bde1df45e9b4..ce315be045b6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
@@ -30,13 +30,15 @@
* {@link PTransform}.
*/
class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
- @SuppressWarnings({"unchecked", "rawtypes"})
@Override
public TransformEvaluator forApplication(
AppliedPTransform, ?, ?> application,
CommittedBundle> inputBundle,
InProcessEvaluationContext evaluationContext) {
- return createInMemoryEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
+ @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+ TransformEvaluator evaluator = (TransformEvaluator) createInMemoryEvaluator(
+ (AppliedPTransform) application, inputBundle, evaluationContext);
+ return evaluator;
}
private TransformEvaluator createInMemoryEvaluator(
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index ec63be84c9f1..dec78d606244 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -54,13 +54,14 @@
*/
class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory {
@Override
- @SuppressWarnings({"unchecked", "rawtypes"})
public TransformEvaluator forApplication(
AppliedPTransform, ?, ?> application,
CommittedBundle> inputBundle,
InProcessEvaluationContext evaluationContext) {
- return createEvaluator(
- (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
+ @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+ TransformEvaluator evaluator = (TransformEvaluator) createEvaluator(
+ (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
+ return evaluator;
}
private TransformEvaluator>> createEvaluator(
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
index 24142c2151c9..659bdd2d32fa 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
@@ -43,7 +43,10 @@ public TransformEvaluator forApplication(
AppliedPTransform, ?, ?> application,
CommittedBundle> inputBundle,
InProcessEvaluationContext evaluationContext) {
- return createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
+ @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+ TransformEvaluator evaluator = (TransformEvaluator) createMultiEvaluator(
+ (AppliedPTransform) application, inputBundle, evaluationContext);
+ return evaluator;
}
private static ParDoInProcessEvaluator createMultiEvaluator(
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
index af5914bab051..e9bc1f7ebde6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
@@ -40,7 +40,10 @@ public TransformEvaluator forApplication(
final AppliedPTransform, ?, ?> application,
CommittedBundle> inputBundle,
InProcessEvaluationContext evaluationContext) {
- return createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext);
+ @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+ TransformEvaluator evaluator = (TransformEvaluator) createSingleEvaluator(
+ (AppliedPTransform) application, inputBundle, evaluationContext);
+ return evaluator;
}
private static ParDoInProcessEvaluator createSingleEvaluator(
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
index 314d81f6aafc..dd2bfb19496c 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
@@ -42,14 +42,15 @@
* written.
*/
class ViewEvaluatorFactory implements TransformEvaluatorFactory {
- @SuppressWarnings({"rawtypes", "unchecked"})
@Override
public TransformEvaluator forApplication(
AppliedPTransform, ?, ?> application,
InProcessPipelineRunner.CommittedBundle> inputBundle,
InProcessEvaluationContext evaluationContext) {
- return createEvaluator(
- (AppliedPTransform) application, evaluationContext);
+ @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+ TransformEvaluator evaluator = (TransformEvaluator) createEvaluator(
+ (AppliedPTransform) application, evaluationContext);
+ return evaluator;
}
private TransformEvaluator> createEvaluator(
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java
index 1bb05fb3405b..1c465412d781 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnReflector.java
@@ -625,19 +625,20 @@ private SimpleDoFnAdapter(DoFnReflector reflector, DoFnWithContext.Context c) throws Exception {
ContextAdapter adapter = new ContextAdapter<>(fn, c);
- reflector.invokeStartBundle(fn, adapter, adapter);
+ reflector.invokeStartBundle(fn, (DoFnWithContext.Context) adapter, adapter);
}
@Override
public void finishBundle(DoFn.Context c) throws Exception {
ContextAdapter adapter = new ContextAdapter<>(fn, c);
- reflector.invokeFinishBundle(fn, adapter, adapter);
+ reflector.invokeFinishBundle(fn, (DoFnWithContext.Context) adapter, adapter);
}
@Override
public void processElement(DoFn.ProcessContext c) throws Exception {
ProcessContextAdapter adapter = new ProcessContextAdapter<>(fn, c);
- reflector.invokeProcessElement(fn, adapter, adapter);
+ reflector.invokeProcessElement(
+ fn, (DoFnWithContext.ProcessContext) adapter, adapter);
}
@Override
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java
index 64a0968e0fb2..d56b36eb5367 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/DoFnRunners.java
@@ -115,7 +115,8 @@ public static DoFnRunner createDefault(
if (doFn instanceof ReduceFnExecutor) {
@SuppressWarnings("rawtypes")
ReduceFnExecutor fn = (ReduceFnExecutor) doFn;
- return lateDataDroppingRunner(
+ @SuppressWarnings({"unchecked", "cast", "rawtypes"})
+ DoFnRunner runner = (DoFnRunner) lateDataDroppingRunner(
options,
fn,
sideInputReader,
@@ -125,6 +126,7 @@ public static DoFnRunner createDefault(
stepContext,
addCounterMutator,
(WindowingStrategy) windowingStrategy);
+ return runner;
}
return simpleRunner(
options,