diff --git a/.travis.yml b/.travis.yml
index 74dcaf23a4028..5f000aaeff293 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -14,6 +14,13 @@ language: java
#See https://issues.apache.org/jira/browse/FLINK-1072
matrix:
include:
+ - jdk: "oraclejdk8"
+ env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
+ - jdk: "oraclejdk8"
+ env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
+ - jdk: "oraclejdk8"
+ env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
+
# Always run test groups A and B together
- jdk: "oraclejdk8"
env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
@@ -23,11 +30,18 @@ matrix:
env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
- jdk: "oraclejdk8"
- env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
+ env: PROFILE="-Dhadoop.version=2.6.3 -Dscala-2.10 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
- jdk: "oraclejdk8"
- env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
+ env: PROFILE="-Dhadoop.version=2.6.3 -Dscala-2.10 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
- jdk: "oraclejdk8"
- env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
+ env: PROFILE="-Dhadoop.version=2.6.3 -Dscala-2.10 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
+
+ - jdk: "oraclejdk8"
+ env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,jdk8"
+ - jdk: "oraclejdk8"
+ env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis,jdk8"
+ - jdk: "oraclejdk8"
+ env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis,jdk8"
- jdk: "openjdk7"
env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis"
@@ -37,17 +51,16 @@ matrix:
env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis"
- jdk: "oraclejdk7"
- env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-a,include-kinesis"
+ env: PROFILE="-Dhadoop.version=2.3.0 -Dscala-2.10 -Pflink-fast-tests-a,include-kinesis"
- jdk: "oraclejdk7"
- env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-b,include-kinesis"
+ env: PROFILE="-Dhadoop.version=2.3.0 -Dscala-2.10 -Pflink-fast-tests-b,include-kinesis"
- jdk: "oraclejdk7"
- env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-c,include-kinesis"
+ env: PROFILE="-Dhadoop.version=2.3.0 -Dscala-2.10 -Pflink-fast-tests-c,include-kinesis"
git:
depth: 100
-
env:
global:
# Global variable to avoid hanging travis builds when downloading cache archives.
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 0e0c1464d4b27..f17295fe816a7 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -76,12 +76,7 @@ under the License.
${project.version}
test
-
-
- com.data-artisans
- flakka-testkit_${scala.binary.version}
- test
-
+
+ scala-2.12
+
+
+
+
+ com.typesafe.akka
+ akka-testkit_2.12
+
+
+
+
+ scala
+
+
+
+ !scala-2.12
+
+
+
+
+ com.data-artisans
+ flakka-testkit_${scala.binary.version}
+
+
+
+
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index e061178b8488f..e398ef277b2bf 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -37,7 +37,7 @@ under the License.
- 0.10.0.1
+ 0.10.1.1
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index d27e53aa59d9c..1ff548a3db214 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -20,6 +20,7 @@
import kafka.admin.AdminUtils;
import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
@@ -42,6 +43,7 @@
import org.apache.kafka.common.requests.MetadataResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.collection.Seq$;
import java.io.File;
import java.net.BindException;
@@ -381,7 +383,9 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
try {
scala.Option stringNone = scala.Option.apply(null);
- KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+ scala.collection.Seq emptyReporter =
+ Seq$.MODULE$.empty();
+ KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone, emptyReporter);
server.startup();
return server;
}
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
index fa401bd36d94a..3cb8b7cca5147 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -37,7 +37,7 @@ under the License.
- 0.8.2.2
+ 0.10.1.1
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 5d8ca700f951d..98d13bac9ca0b 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -42,8 +42,6 @@ under the License.
flink-hbase
flink-hcatalog
flink-connector-kafka-base
- flink-connector-kafka-0.8
- flink-connector-kafka-0.9
flink-connector-kafka-0.10
flink-connector-elasticsearch-base
flink-connector-elasticsearch
@@ -100,6 +98,25 @@ under the License.
flink-connector-elasticsearch5
+
+
+ scala-2.12
+
+ 0.10.1.1
+
+
+
+ scala
+
+
+ !scala-2.12
+
+
+
+ flink-connector-kafka-0.8
+ flink-connector-kafka-0.9
+
+
diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml
index a218f120de4c5..16cc7cb13426f 100644
--- a/flink-contrib/flink-streaming-contrib/pom.xml
+++ b/flink-contrib/flink-streaming-contrib/pom.xml
@@ -100,13 +100,6 @@ under the License.
-Xms128m
-Xmx512m
-
-
- org.scalamacros
- paradise_${scala.version}
- ${scala.macros.version}
-
-
@@ -184,4 +177,35 @@ under the License.
+
+
+
+ scala
+
+
+ !scala-2.12
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+ org.scalamacros
+ paradise_${scala.version}
+ ${scala.macros.version}
+
+
+
+
+
+
+
+
+
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
index 159891bad4cb2..9e9330717387b 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
@@ -77,7 +77,7 @@ object DeltaPageRank {
(solutionSet, workset) =>
{
val deltas = workset.join(adjacency).where(0).equalTo(0) {
- (lastDeltas, adj, out: Collector[Page]) =>
+ (lastDeltas: (Long, Double), adj: (Long, Array[Long]), out: Collector[Page]) =>
{
val targets = adj._2
val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / targets.length
@@ -88,7 +88,7 @@ object DeltaPageRank {
}
}
.groupBy(0).sum(1)
- .filter(x => Math.abs(x._2) > THRESHOLD)
+ .filter((x: (Long, Double)) => Math.abs(x._2) > THRESHOLD)
val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) {
(current, delta) => (current._1, current._2 + delta._2)
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index 1f842d5b4b96a..5bd572fb4474b 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -114,7 +114,7 @@ object PageRankBasic {
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
- .map { p =>
+ .map { p: Page =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
}.withForwardedFields("pageId")
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index 1f3a32b0a167c..1e99c7bd1f1c7 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -58,7 +58,7 @@ object TransitiveClosureNaive {
val nextPaths = prevPaths
.join(edges)
.where(1).equalTo(0) {
- (left, right) => (left._1,right._2)
+ (left: (Long, Long), right: (Long, Long)) => (left._1,right._2)
}.withForwardedFieldsFirst("_1").withForwardedFieldsSecond("_2")
.union(prevPaths)
.groupBy(0, 1)
@@ -67,7 +67,8 @@ object TransitiveClosureNaive {
val terminate = prevPaths
.coGroup(nextPaths)
.where(0).equalTo(0) {
- (prev, next, out: Collector[(Long, Long)]) => {
+ (prev: Iterator[(Long, Long)], next: Iterator[(Long, Long)],
+ out: Collector[(Long, Long)]) => {
val prevPaths = prev.toSet
for (n <- next)
if (!prevPaths.contains(n)) out.collect(n)
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
index 7ed39c9428112..9d48dc8e949ba 100644
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
@@ -105,20 +105,22 @@ object WebLogAnalysis {
val visits = getVisitsDataSet(env, params)
val filteredDocs = documents
- .filter(doc => doc._2.contains(" editors ") && doc._2.contains(" oscillations "))
+ .filter((doc: (String, String)) =>
+ doc._2.contains(" editors ") && doc._2.contains(" oscillations "))
val filteredRanks = ranks
- .filter(rank => rank._1 > 40)
+ .filter((rank: (Int, String, Int)) => rank._1 > 40)
val filteredVisits = visits
- .filter(visit => visit._2.substring(0, 4).toInt == 2007)
+ .filter((visit: (String, String)) => visit._2.substring(0, 4).toInt == 2007)
val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) {
- (doc, rank) => rank
+ (doc: (String, String), rank: (Int, String, Int)) => rank
}.withForwardedFieldsSecond("*")
val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) {
- (ranks, visits, out: Collector[(Int, String, Int)]) =>
+ (ranks: Iterator[(Int, String, Int)], visits: Iterator[(String, String)],
+ out: Collector[(Int, String, Int)]) =>
if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
}.withForwardedFieldsFirst("*")
diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml
index e95aa37080a55..bf374490b0c74 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -124,16 +124,9 @@
-Xms128m
-Xmx512m
-
-
- org.scalamacros
- paradise_${scala.version}
- ${scala.macros.version}
-
-
-
+
org.apache.maven.plugins
@@ -222,4 +215,35 @@
+
+
+
+ scala
+
+
+ !scala-2.12
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+ org.scalamacros
+ paradise_${scala.version}
+ ${scala.macros.version}
+
+
+
+
+
+
+
+
+
diff --git a/flink-libraries/flink-gelly-scala/pom.xml b/flink-libraries/flink-gelly-scala/pom.xml
index dad7237b5a7fc..1ab6813401bfa 100644
--- a/flink-libraries/flink-gelly-scala/pom.xml
+++ b/flink-libraries/flink-gelly-scala/pom.xml
@@ -18,194 +18,186 @@ specific language governing permissions and limitations
under the License.
-->
-
- org.apache.flink
- flink-libraries
- 1.3-SNAPSHOT
- ..
-
- 4.0.0
-
- flink-gelly-scala_2.10
- flink-gelly-scala
-
- jar
-
-
-
-
-
- org.apache.flink
- flink-scala_2.10
- ${project.version}
- provided
-
-
-
- org.apache.flink
- flink-clients_2.10
- ${project.version}
- provided
-
-
-
- org.apache.flink
- flink-gelly_2.10
- ${project.version}
-
-
-
-
-
- org.scala-lang
- scala-reflect
- provided
-
-
-
- org.scala-lang
- scala-library
- provided
-
-
-
- org.scala-lang
- scala-compiler
- provided
-
-
-
- org.ow2.asm
- asm
- ${asm.version}
- provided
-
-
-
-
-
- org.apache.flink
- flink-tests_2.10
- ${project.version}
- test
- test-jar
-
-
-
- org.apache.flink
- flink-test-utils_2.10
- ${project.version}
- test
-
-
-
-
-
-
-
- net.alchim31.maven
- scala-maven-plugin
-
-
+
+ org.apache.flink
+ flink-scala_2.10
+ ${project.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-clients_2.10
+ ${project.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-gelly_2.10
+ ${project.version}
+
+
+
+
+
+ org.scala-lang
+ scala-reflect
+ provided
+
+
+
+ org.scala-lang
+ scala-library
+ provided
+
+
+
+ org.scala-lang
+ scala-compiler
+ provided
+
+
+
+ org.ow2.asm
+ asm
+ ${asm.version}
+ provided
+
+
+
+
+
+ org.apache.flink
+ flink-tests_2.10
+ ${project.version}
+ test
+ test-jar
+
+
+
+ org.apache.flink
+ flink-test-utils_2.10
+ ${project.version}
+ test
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
-
- scala-compile-first
- process-resources
-
- compile
-
-
-
-
-
- scala-test-compile
- process-test-resources
-
- testCompile
-
-
-
-
-
- -Xms128m
- -Xmx512m
-
-
-
- org.scalamacros
- paradise_${scala.version}
- ${scala.macros.version}
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-eclipse-plugin
- 2.8
-
- true
-
- org.scala-ide.sdt.core.scalanature
- org.eclipse.jdt.core.javanature
-
-
- org.scala-ide.sdt.core.scalabuilder
-
-
- org.scala-ide.sdt.launching.SCALA_CONTAINER
- org.eclipse.jdt.launching.JRE_CONTAINER
-
-
- org.scala-lang:scala-library
- org.scala-lang:scala-compiler
-
-
- **/*.scala
- **/*.java
-
-
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
- 1.7
-
-
-
- add-source
- generate-sources
-
- add-source
-
-
-
- src/main/scala
-
-
-
-
-
- add-test-source
- generate-test-sources
-
- add-test-source
-
-
-
- src/test/scala
-
-
-
-
-
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+ -Xms128m
+ -Xmx512m
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-eclipse-plugin
+ 2.8
+
+ true
+
+ org.scala-ide.sdt.core.scalanature
+ org.eclipse.jdt.core.javanature
+
+
+ org.scala-ide.sdt.core.scalabuilder
+
+
+ org.scala-ide.sdt.launching.SCALA_CONTAINER
+ org.eclipse.jdt.launching.JRE_CONTAINER
+
+
+ org.scala-lang:scala-library
+ org.scala-lang:scala-compiler
+
+
+ **/*.scala
+ **/*.java
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 1.7
+
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ src/main/scala
+
+
+
+
+
+ add-test-source
+ generate-test-sources
+
+ add-test-source
+
+
+
+ src/test/scala
+
+
+
+
+
@@ -216,25 +208,55 @@ under the License.
-
-
- maven-assembly-plugin
-
-
- jar-with-dependencies
-
-
-
-
- make-assembly
- package
-
- single
-
-
-
-
-
-
+
+
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
+
+ scala
+
+
+ !scala-2.12
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+ org.scalamacros
+ paradise_${scala.version}
+ ${scala.macros.version}
+
+
+
+
+
+
+
+
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 27bc548d742f6..e8bc37c6cf67f 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -109,8 +109,8 @@ object Graph {
def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
env: ExecutionEnvironment): Graph[K, VV, EV] = {
- val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
- val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+ val javaTupleVertices = vertices.map((v: (K, VV)) => new jtuple.Tuple2(v._1, v._2)).javaSet
+ val javaTupleEdges = edges.map((v: (K, K, EV)) => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges,
env.getJavaEnv))
}
@@ -124,7 +124,7 @@ object Graph {
*/
def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
(edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = {
- val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+ val javaTupleEdges = edges.map((v: (K, K, EV)) => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
wrapGraph(jg.Graph.fromTupleDataSet[K, EV](javaTupleEdges, env.getJavaEnv))
}
@@ -139,7 +139,7 @@ object Graph {
def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
TypeInformation : ClassTag](edges: DataSet[(K, K, EV)],
vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = {
- val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
+ val javaTupleEdges = edges.map((v: (K, K, EV)) => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, vertexValueInitializer,
env.getJavaEnv))
}
@@ -152,7 +152,7 @@ object Graph {
*/
def fromTuple2DataSet[K: TypeInformation : ClassTag](edges: DataSet[(K, K)],
env: ExecutionEnvironment): Graph[K, NullValue, NullValue] = {
- val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+ val javaTupleEdges = edges.map((v: (K, K)) => new jtuple.Tuple2(v._1, v._2)).javaSet
wrapGraph(jg.Graph.fromTuple2DataSet[K](javaTupleEdges, env.getJavaEnv))
}
@@ -166,7 +166,7 @@ object Graph {
def fromTuple2DataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag]
(edges: DataSet[(K, K)], vertexValueInitializer: MapFunction[K, VV],
env: ExecutionEnvironment): Graph[K, VV, NullValue] = {
- val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+ val javaTupleEdges = edges.map((v: (K, K)) => new jtuple.Tuple2(v._1, v._2)).javaSet
wrapGraph(jg.Graph.fromTuple2DataSet[K, VV](javaTupleEdges, vertexValueInitializer,
env.getJavaEnv))
}
@@ -259,7 +259,7 @@ object Graph {
ignoreCommentsEdges,
lenientEdges,
includedFieldsEdges)
- .map(edge => (edge._1, edge._2, NullValue.getInstance))
+ .map((edge: (K, K)) => (edge._1, edge._2, NullValue.getInstance))
.asInstanceOf[DataSet[(K, K, EV)]]
} else {
env.readCsvFile[(K, K, EV)](
@@ -331,14 +331,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @return the vertex DataSet as Tuple2.
*/
def getVerticesAsTuple2(): DataSet[(K, VV)] = {
- wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1))
+ wrap(jgraph.getVerticesAsTuple2).map((javatuple: jtuple.Tuple2[K, VV])
+ => (javatuple.f0, javatuple.f1))
}
/**
* @return the edge DataSet as Tuple3.
*/
def getEdgesAsTuple3(): DataSet[(K, K, EV)] = {
- wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
+ wrap(jgraph.getEdgesAsTuple3).map((javatuple: jtuple.Tuple3[K, K, EV])
+ => (javatuple.f0, javatuple.f1, javatuple.f2))
}
/**
@@ -508,7 +510,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*/
def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)],
vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV] = {
- val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+ val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, vertexJoinFunction))
}
@@ -537,7 +539,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
cleanFun(vertexValue, inputValue)
}
}
- val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+ val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newVertexJoin))
}
@@ -559,8 +561,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*/
def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)],
edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
- val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
- scalatuple._2, scalatuple._3)).javaSet
+ val javaTupleSet = inputDataSet.map((scalatuple: (K, K, T)) =>
+ new jtuple.Tuple3(scalatuple._1, scalatuple._2, scalatuple._3)).javaSet
wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, edgeJoinFunction))
}
@@ -588,8 +590,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
cleanFun(edgeValue, inputValue)
}
}
- val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
- scalatuple._2, scalatuple._3)).javaSet
+ val javaTupleSet = inputDataSet.map((scalatuple: (K, K, T)) =>
+ new jtuple.Tuple3(scalatuple._1, scalatuple._2, scalatuple._3)).javaSet
wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newEdgeJoin))
}
@@ -611,7 +613,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*/
def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)],
edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
- val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+ val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, edgeJoinFunction))
}
@@ -641,7 +643,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
cleanFun(edgeValue, inputValue)
}
}
- val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+ val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newEdgeJoin))
}
@@ -664,7 +666,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*/
def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)],
edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
- val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+ val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, edgeJoinFunction))
}
@@ -694,7 +696,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
cleanFun(edgeValue, inputValue)
}
}
- val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+ val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1,
scalatuple._2)).javaSet
wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newEdgeJoin))
}
@@ -799,7 +801,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @return A DataSet of Tuple2
*/
def inDegrees(): DataSet[(K, LongValue)] = {
- wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+ wrap(jgraph.inDegrees).map((javatuple: jtuple.Tuple2[K, LongValue]) =>
+ (javatuple.f0, javatuple.f1))
}
/**
@@ -808,7 +811,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @return A DataSet of Tuple2
*/
def outDegrees(): DataSet[(K, LongValue)] = {
- wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+ wrap(jgraph.outDegrees).map((javatuple: jtuple.Tuple2[K, LongValue]) =>
+ (javatuple.f0, javatuple.f1))
}
/**
@@ -817,7 +821,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @return A DataSet of Tuple2
*/
def getDegrees(): DataSet[(K, LongValue)] = {
- wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+ wrap(jgraph.getDegrees).map((javatuple: jtuple.Tuple2[K, LongValue]) =>
+ (javatuple.f0, javatuple.f1))
}
/**
@@ -927,7 +932,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @return The IDs of the edges as DataSet
*/
def getEdgeIds(): DataSet[(K, K)] = {
- wrap(jgraph.getEdgeIds).map(jtuple => (jtuple.f0, jtuple.f1))
+ wrap(jgraph.getEdgeIds).map((javatuple: jtuple.Tuple2[K, K]) =>
+ (javatuple.f0, javatuple.f1))
}
/**
@@ -1083,8 +1089,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*/
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction:
EdgeDirection): DataSet[(K, VV)] = {
- wrap(jgraph.reduceOnNeighbors(reduceNeighborsFunction, direction)).map(jtuple => (jtuple
- .f0, jtuple.f1))
+ wrap(jgraph.reduceOnNeighbors(reduceNeighborsFunction, direction))
+ .map((javatuple: jtuple.Tuple2[K, VV]) => (javatuple.f0, javatuple.f1))
}
/**
@@ -1102,8 +1108,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*/
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection):
DataSet[(K, EV)] = {
- wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0,
- jtuple.f1))
+ wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction))
+ .map((javatuple: jtuple.Tuple2[K, EV]) => (javatuple.f0, javatuple.f1))
}
/**
diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml
index bea956e87d48c..9f9e71047b51d 100644
--- a/flink-libraries/flink-ml/pom.xml
+++ b/flink-libraries/flink-ml/pom.xml
@@ -33,6 +33,9 @@
jar
+
+ 0.13
+
@@ -47,7 +50,7 @@
org.scalanlp
breeze_${scala.binary.version}
- 0.12
+ ${breeze.version}
@@ -201,4 +204,5 @@
+
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
index 721dd69308219..a4d962c48b9d4 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
@@ -349,7 +349,7 @@ object SVM{
// Count the number of vectors, but keep the value in a DataSet to broadcast it later
// TODO: Once efficient count and intermediate result partitions are implemented, use count
- val numberVectors = input map { x => 1 } reduce { _ + _ }
+ val numberVectors: DataSet[Int] = input map { x: LabeledVector => 1 } reduce { _ + _ }
// Group the input data into blocks in round robin fashion
val blockedInputNumberElements = FlinkMLTools.block(
@@ -357,7 +357,7 @@ object SVM{
blocks,
Some(ModuloKeyPartitioner)).
cross(numberVectors).
- map { x => x }
+ map { (x: (Block[LabeledVector], Int)) => x }
val resultingWeights = initialWeights.iterate(iterations) {
weights => {
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
index bfc72a4fcb120..104f18edf9e29 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
@@ -376,7 +376,7 @@ object FlinkMLTools {
partitionerOption: Option[Partitioner[Int]] = None)
: DataSet[Block[T]] = {
val blockIDInput = input map {
- element =>
+ (element: T) =>
val blockID = element.hashCode() % numBlocks
val blockIDResult = if(blockID < 0){
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
index 527e6365ff627..be91d5d377763 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
@@ -227,7 +227,8 @@ object KNN {
// join input and training set
val crossed = crossTuned.mapPartition {
- (iter, out: Collector[(FlinkVector, FlinkVector, Long, Double)]) => {
+ (iter: Iterator[(Block[FlinkVector], Block[(Long, T)])],
+ out: Collector[(FlinkVector, FlinkVector, Long, Double)]) => {
for ((training, testing) <- iter) {
// use a quadtree if (4 ^ dim) * Ntest * log(Ntrain)
// < Ntest * Ntrain, and distance is Euclidean
@@ -252,7 +253,8 @@ object KNN {
// group by input vector id and pick k nearest neighbor for each group
val result = crossed.groupBy(2).sortGroup(3, Order.ASCENDING).reduceGroup {
- (iter, out: Collector[(FlinkVector, Array[FlinkVector])]) => {
+ (iter: Iterator[(FlinkVector, FlinkVector, Long, Double)],
+ out: Collector[(FlinkVector, Array[FlinkVector])]) => {
if (iter.hasNext) {
val head = iter.next()
val key = head._2
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
index 2c04bb05fa4e2..3dfefbd6db243 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
@@ -197,7 +197,7 @@ object StochasticOutlierSelection extends WithParameters {
val resultingParameters = instance.parameters ++ transformParameters
// Map to the right format
- val vectorsWithIndex = input.zipWithUniqueId.map(vector => {
+ val vectorsWithIndex = input.zipWithUniqueId.map((vector: (Long, T)) => {
BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze)
})
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
index 217e2c26fed16..b37748fb09cf3 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
@@ -149,7 +149,7 @@ object MinMaxScaler {
: DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
val minMax = dataSet.map {
- v => (v.asBreeze, v.asBreeze)
+ v: T => (v.asBreeze, v.asBreeze)
}.reduce {
(minMax1, minMax2) => {
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
index f1c788e4fafd4..977428d953126 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
@@ -116,7 +116,7 @@ object PolynomialFeatures{
val degree = resultingParameters(Degree)
input.map {
- vector => {
+ vector: T => {
calculatePolynomial(degree, vector)
}
}
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
index 46b14624823b0..f14d2e34353fc 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
@@ -76,7 +76,7 @@ object Splitter {
}
}
- val leftSplitLight = leftSplit.map(o => (o._1, false))
+ val leftSplitLight = leftSplit.map((o: (Long, T)) => (o._1, false))
val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, Boolean)](leftSplitLight)
.where(0)
@@ -87,7 +87,7 @@ object Splitter {
}
}
- Array(leftSplit.map(o => o._2), rightSplit)
+ Array(leftSplit.map((o: (Long, T)) => o._2), rightSplit)
}
// --------------------------------------------------------------------------------------------
@@ -117,14 +117,14 @@ object Splitter {
eid.reseedRandomGenerator(seed)
- val tempDS: DataSet[(Int,T)] = input.map(o => (eid.sample, o))
+ val tempDS: DataSet[(Int,T)] = input.map((o: T) => (eid.sample, o))
val splits = fracArray.length
val outputArray = new Array[DataSet[T]]( splits )
for (k <- 0 to splits-1){
- outputArray(k) = tempDS.filter(o => o._1 == k)
- .map(o => o._2)
+ outputArray(k) = tempDS.filter((o: (Int, T)) => o._1 == k)
+ .map((o: (Int, T)) => o._2)
}
outputArray
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
index 82e8abf4df1b6..fab936f6e9c44 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
@@ -158,7 +158,7 @@ object StandardScaler {
fitParameters: ParameterMap,
input: DataSet[(T, Double)])
: Unit = {
- val vectorDS = input.map(_._1)
+ val vectorDS = input.map((i: (T, Double)) => i._1)
val metrics = extractFeatureMetrics(vectorDS)
instance.metricsOption = Some(metrics)
@@ -180,7 +180,7 @@ object StandardScaler {
private def extractFeatureMetrics[T <: Vector](dataSet: DataSet[T])
: DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
val metrics = dataSet.map{
- v => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size))
+ (v: T) => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size))
}.reduce{
(metrics1, metrics2) => {
/* We use formula 1.5b of the cited technical report for the combination of partial
diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
index 04543813994cc..8016341b361ef 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
@@ -210,7 +210,7 @@ class ALS extends Predictor[ALS] {
val predictions = data.join(userFactors, JoinHint.REPARTITION_HASH_SECOND).where(0)
.equalTo(0).join(itemFactors, JoinHint.REPARTITION_HASH_SECOND).where("_1._2")
.equalTo(0).map {
- triple => {
+ (triple: (((Long, Long), ALS.Factors), ALS.Factors)) => {
val (((uID, iID), uFactors), iFactors) = triple
val uFactorsVector = uFactors.factors
@@ -404,7 +404,7 @@ object ALS {
case Some((userFactors, itemFactors)) => {
input.join(userFactors, JoinHint.REPARTITION_HASH_SECOND).where(0).equalTo(0)
.join(itemFactors, JoinHint.REPARTITION_HASH_SECOND).where("_1._2").equalTo(0).map {
- triple => {
+ (triple: (((Long, Long), ALS.Factors), ALS.Factors)) => {
val (((uID, iID), uFactors), iFactors) = triple
val uFactorsVector = uFactors.factors
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index eea5da7792bba..35b4f99ce3c57 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -60,28 +60,6 @@ under the License.
${project.version}
-
- com.data-artisans
- flakka-actor_${scala.binary.version}
-
-
-
- com.data-artisans
- flakka-remote_${scala.binary.version}
-
-
-
- com.google.protobuf
- protobuf-java
-
-
-
-
-
- com.data-artisans
- flakka-slf4j_${scala.binary.version}
-
-
org.apache.mesos
mesos
@@ -122,12 +100,6 @@ under the License.
test
-
- com.data-artisans
- flakka-testkit_${scala.binary.version}
- test
-
-
org.apache.flink
flink-runtime_2.10
@@ -287,4 +259,65 @@ under the License.
+
+
+
+
+ scala-2.12
+
+
+
+ scala-2.12
+
+
+
+
+ com.typesafe.akka
+ akka-actor_2.12
+
+
+ com.typesafe.akka
+ akka-slf4j_2.12
+
+
+ com.typesafe.akka
+ akka-remote_2.12
+
+
+ com.typesafe.akka
+ akka-testkit_2.12
+
+
+
+
+ scala
+
+
+
+ !scala-2.12
+
+
+
+
+ com.data-artisans
+ flakka-actor_${scala.binary.version}
+
+
+
+ com.data-artisans
+ flakka-remote_${scala.binary.version}
+
+
+
+ com.data-artisans
+ flakka-slf4j_${scala.binary.version}
+
+
+
+ com.data-artisans
+ flakka-testkit_${scala.binary.version}
+
+
+
+
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index bfe9be818579e..69262cdc68cc4 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -126,7 +126,7 @@ public int getPorts() {
}
@Override
- public Map getCustomNamedResources() {
+ public Map getCustomNamedResources() {
return Collections.emptyMap();
}
@@ -141,12 +141,12 @@ public List extends VMTaskFitnessCalculator> getSoftConstraints() {
}
@Override
- public void setAssignedResources(AssignedResources assignedResources) {
+ public void setAssignedResources(TaskRequest.AssignedResources assignedResources) {
this.assignedResources.set(assignedResources);
}
@Override
- public AssignedResources getAssignedResources() {
+ public TaskRequest.AssignedResources getAssignedResources() {
return assignedResources.get();
}
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
index 34c1f66412950..363fbd7be9e23 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
@@ -21,29 +21,29 @@ package org.apache.flink.mesos.scheduler
import java.util.{Collections, UUID}
import java.util.concurrent.atomic.AtomicReference
+import akka.actor.ActorSystem
import akka.actor.FSM.StateTimeout
import akka.testkit._
import com.netflix.fenzo.TaskRequest.{AssignedResources, NamedResourceSetRequest}
import com.netflix.fenzo._
import com.netflix.fenzo.functions.{Action1, Action2}
import com.netflix.fenzo.plugins.VMLeaseObject
-import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
+import org.apache.flink.api.java.tuple.{Tuple2 => FlinkTuple2}
import org.apache.flink.configuration.Configuration
import org.apache.flink.mesos.scheduler.LaunchCoordinator._
import org.apache.flink.mesos.scheduler.messages._
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.mesos.Protos.{SlaveID, TaskInfo}
-import org.apache.mesos.{SchedulerDriver, Protos}
+import org.apache.mesos.{Protos, SchedulerDriver}
import org.junit.runner.RunWith
import org.mockito.Mockito.{verify, _}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
-import org.mockito.{Matchers => MM, Mockito}
+import org.mockito.{Mockito, Matchers => MM}
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import scala.collection.JavaConverters._
-
import org.apache.flink.mesos.Utils.range
import org.apache.flink.mesos.Utils.ranges
import org.apache.flink.mesos.Utils.scalar
@@ -57,7 +57,7 @@ class LaunchCoordinatorTest
with BeforeAndAfterAll {
lazy val config = new Configuration()
- implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
+ implicit lazy val system: ActorSystem = AkkaUtils.createLocalActorSystem(config)
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
index c22385204ce16..cc3792c7994ef 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala
@@ -20,14 +20,14 @@ package org.apache.flink.mesos.scheduler
import java.util.UUID
-import akka.actor.FSM
+import akka.actor.{ActorSystem, FSM}
import akka.testkit._
import org.apache.flink.configuration.Configuration
import org.apache.flink.mesos.Matchers._
import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.mesos.Protos.TaskState._
-import org.apache.mesos.{SchedulerDriver, Protos}
+import org.apache.mesos.{Protos, SchedulerDriver}
import org.junit.runner.RunWith
import org.mockito.Mockito
import org.mockito.Mockito._
@@ -45,7 +45,7 @@ class ReconciliationCoordinatorTest
import ReconciliationCoordinator._
lazy val config = new Configuration()
- implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
+ implicit lazy val system: ActorSystem = AkkaUtils.createLocalActorSystem(config)
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
index b4ef93837c1a7..01d10f91e1605 100644
--- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
+++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala
@@ -20,14 +20,15 @@ package org.apache.flink.mesos.scheduler
import java.util.UUID
+import akka.actor.ActorSystem
import akka.actor.FSM.StateTimeout
import akka.testkit._
import org.apache.flink.configuration.Configuration
import org.apache.flink.mesos.TestFSMUtils
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile
-import org.apache.flink.mesos.scheduler.messages.{Disconnected, Connected, StatusUpdate}
+import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate}
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.mesos.{SchedulerDriver, Protos}
+import org.apache.mesos.{Protos, SchedulerDriver}
import org.apache.mesos.Protos.TaskState._
import org.junit.runner.RunWith
import org.mockito.Mockito
@@ -46,7 +47,7 @@ class TaskMonitorTest
import TaskMonitor._
lazy val config = new Configuration()
- implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
+ implicit lazy val system: ActorSystem = AkkaUtils.createLocalActorSystem(config)
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index a50e01f88902d..f491680a81de7 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -91,11 +91,6 @@ under the License.
test
-
- com.data-artisans
- flakka-testkit_${scala.binary.version}
- test
-
org.apache.flink
@@ -132,5 +127,39 @@ under the License.
+
+
+
+ scala-2.12
+
+
+
+ scala-2.12
+
+
+
+
+ com.typesafe.akka
+ akka-testkit_2.12
+
+
+
+
+ scala
+
+
+
+ !scala-2.12
+
+
+
+
+
+ com.data-artisans
+ flakka-testkit_${scala.binary.version}
+
+
+
+
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index a6b9513803ccf..2538c4c3d2ffd 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -95,22 +95,7 @@ under the License.
org.scala-lang
scala-library
-
-
- com.data-artisans
- flakka-actor_${scala.binary.version}
-
-
-
- com.data-artisans
- flakka-remote_${scala.binary.version}
-
-
-
- com.data-artisans
- flakka-slf4j_${scala.binary.version}
-
-
+
org.clapper
grizzled-slf4j_${scala.binary.version}
@@ -184,10 +169,6 @@ under the License.
test
-
- com.data-artisans
- flakka-testkit_${scala.binary.version}
-
org.reflections
@@ -395,4 +376,64 @@ under the License.
+
+
+
+ scala-2.12
+
+
+
+ scala-2.12
+
+
+
+
+ com.typesafe.akka
+ akka-actor_2.12
+
+
+ com.typesafe.akka
+ akka-slf4j_2.12
+
+
+ com.typesafe.akka
+ akka-remote_2.12
+
+
+ com.typesafe.akka
+ akka-testkit_2.12
+
+
+
+
+ scala
+
+
+
+ !scala-2.12
+
+
+
+
+ com.data-artisans
+ flakka-actor_${scala.binary.version}
+
+
+
+ com.data-artisans
+ flakka-remote_${scala.binary.version}
+
+
+
+ com.data-artisans
+ flakka-slf4j_${scala.binary.version}
+
+
+
+ com.data-artisans
+ flakka-testkit_${scala.binary.version}
+
+
+
+
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 2e8a6fa19132a..8c344d478b314 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1350,8 +1350,8 @@ class TaskManager(
accumulatorEvents.append(accumulators)
} catch {
case e: Exception =>
- log.warn("Failed to take accumulator snapshot for task {}.",
- execID, ExceptionUtils.getRootCause(e))
+ log.warn(s"Failed to take accumulator snapshot for task $execID.",
+ ExceptionUtils.getRootCause(e))
}
}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index a3d31f595ef34..868057d9cc348 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -68,7 +68,7 @@ trait TestingJobManagerLike extends FlinkActor {
val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
new Ordering[(Int, ActorRef)] {
override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
- })
+ }).result()
val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
index cf637d4afcbc9..f5b288c4eb9ce 100644
--- a/flink-scala-shell/pom.xml
+++ b/flink-scala-shell/pom.xml
@@ -16,7 +16,7 @@ specific language governing permissions and limitations
under the License.
-->
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4.0.0
@@ -39,7 +39,7 @@ under the License.
com.github.scopt
- scopt_${scala.binary.version}
+ scopt_${scala.binary.version}
@@ -121,13 +121,6 @@ under the License.
-Xms128m
-Xmx512m
-
-
- org.scalamacros
- paradise_${scala.version}
- ${scala.macros.version}
-
-
@@ -214,8 +207,7 @@ under the License.
scala-2.10
-
- !scala-2.11
+ scala-2.10
@@ -232,6 +224,33 @@ under the License.
+
+ scala
+
+
+ !scala-2.12
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+ org.scalamacros
+ paradise_${scala.version}
+ ${scala.macros.version}
+
+
+
+
+
+
+
+
diff --git a/flink-scala-shell/src/main/scala-2.12/org/apache/flink/api/scala/ILoopCompat.scala b/flink-scala-shell/src/main/scala-2.12/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000000000..a3cc7dfa7cfa7
--- /dev/null
+++ b/flink-scala-shell/src/main/scala-2.12/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+import _root_.scala.io.AnsiColor.{MAGENTA, RESET}
+
+class ILoopCompat(
+ in0: Option[BufferedReader],
+ out0: JPrintWriter)
+ extends ILoop(in0, out0) {
+
+ override def prompt = {
+ val promptStr = "Scala-Flink> "
+ s"$MAGENTA$promptStr$RESET"
+ }
+
+ protected def addThunk(f: => Unit): Unit = f
+}
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index f4c8246ddca3b..a62200b43f25f 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -113,7 +113,7 @@ under the License.
scala-maven-plugin
+ scala classes can be resolved later in the (Java) compile phase -->
scala-compile-first
process-resources
@@ -121,9 +121,9 @@ under the License.
compile
-
+
+ scala classes can be resolved later in the (Java) test-compile phase -->
scala-test-compile
process-test-resources
@@ -137,13 +137,6 @@ under the License.
-Xms128m
-Xmx512m
-
-
- org.scalamacros
- paradise_${scala.version}
- ${scala.macros.version}
-
-
@@ -228,8 +221,7 @@ under the License.
scala-2.10
-
- !scala-2.11
+ scala-2.10
@@ -240,6 +232,36 @@ under the License.
-
+
+ scala
+
+
+ !scala-2.12
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+ org.scalamacros
+ paradise_${scala.version}
+ ${scala.macros.version}
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
index 53bffffe9f424..9a5f385663b12 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
@@ -21,12 +21,12 @@ import java.io._
import org.apache.flink.annotation.Internal
import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala.utils.LambdaClosureCleaner
import org.apache.flink.util.InstantiationUtil
import org.slf4j.LoggerFactory
import scala.collection.mutable.Map
import scala.collection.mutable.Set
-
import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.objectweb.asm.Opcodes._
@@ -36,7 +36,7 @@ object ClosureCleaner {
val LOG = LoggerFactory.getLogger(this.getClass)
// Get an ASM class reader for a given class from the JAR that loaded it
- private def getClassReader(cls: Class[_]): ClassReader = {
+ def getClassReader(cls: Class[_]): ClassReader = {
// Copy data over, before delegating to ClassReader - else we can run out of open file handles.
val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
val resourceStream = cls.getResourceAsStream(className)
@@ -50,8 +50,9 @@ object ClosureCleaner {
}
// Check whether a class represents a Scala closure
+ // since scala 2.12 anonymous functions are called Lambda, pre 2.12 anonfun.
private def isClosure(cls: Class[_]): Boolean = {
- cls.getName.contains("$anonfun$")
+ cls.getName.contains("$anonfun$") //|| cls.getName.contains("$Lambda$")
}
// Get a list of the classes of the outer objects of a given closure object, obj;
@@ -110,10 +111,21 @@ object ClosureCleaner {
}
def clean(func: AnyRef, checkSerializable: Boolean = true) {
+
+ if (!isClosure(func.getClass)) {
+ LambdaClosureCleaner.clean(func)
+ return
+ }
+
+ if (func == null) {
+ return
+ }
+
// TODO: cache outerClasses / innerClasses / accessedFields
val outerClasses = getOuterClasses(func)
val innerClasses = getInnerClasses(func)
val outerObjects = getOuterObjects(func)
+
val accessedFields = Map[Class[_], Set[String]]()
@@ -247,6 +259,28 @@ class ReturnStatementFinder extends ClassVisitor(ASM5) {
}
}
+private[flink]
+class LambdaReturnStatementFinder(targetMethodName: String) extends ClassVisitor(ASM5) {
+ override def visitMethod(
+ access: Int,
+ name: String,
+ desc: String,
+ sig: String,
+ exceptions: Array[String]): MethodVisitor = {
+ if (name == targetMethodName) {
+ new MethodVisitor(ASM5) {
+ override def visitTypeInsn(op: Int, tp: String) {
+ if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {
+ throw new InvalidProgramException("Return statements aren't allowed in Flink closures")
+ }
+ }
+ }
+ } else {
+ new MethodVisitor(ASM5) {}
+ }
+ }
+}
+
@Internal
private[flink]
class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM5) {
@@ -262,7 +296,7 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor
}
override def visitMethodInsn(op: Int, owner: String, name: String,
- desc: String) {
+ desc: String, itf: Boolean) {
// Check for calls a getter method for a variable in an interpreter wrapper object.
// This means that the corresponding field will be accessed, so we should save it.
if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) {
@@ -288,7 +322,7 @@ private[flink] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi
sig: String, exceptions: Array[String]): MethodVisitor = {
new MethodVisitor(ASM5) {
override def visitMethodInsn(op: Int, owner: String, name: String,
- desc: String) {
+ desc: String, itf: Boolean) {
val argTypes = Type.getArgumentTypes(desc)
if (op == INVOKESPECIAL && name == "" && argTypes.nonEmpty
&& argTypes(0).toString.startsWith("L") // is it an object?
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
index b2521b07a7005..9f28c3de94f43 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
+import org.apache.flink.util.Collector
import scala.reflect.ClassTag
@@ -53,7 +54,7 @@ class OnDataSet[T](ds: DataSet[T]) {
@PublicEvolving
def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
ds.mapPartition {
- (it, out) =>
+ (it: Iterator[T], out: Collector[R]) =>
out.collect(fun(it.toStream))
}
@@ -100,7 +101,7 @@ class OnDataSet[T](ds: DataSet[T]) {
@PublicEvolving
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
ds.reduceGroup {
- (it, out) =>
+ (it: Iterator[T], out: Collector[R]) =>
out.collect(fun(it.toStream))
}
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
index 07abccb423846..3636358887742 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
+import org.apache.flink.util.Collector
import scala.reflect.ClassTag
@@ -65,7 +66,7 @@ class OnGroupedDataSet[T](ds: GroupedDataSet[T]) {
@PublicEvolving
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
ds.reduceGroup {
- (it, out) =>
+ (it: Iterator[T], out: Collector[R]) =>
out.collect(fun(it.toStream))
}
@@ -80,7 +81,7 @@ class OnGroupedDataSet[T](ds: GroupedDataSet[T]) {
@PublicEvolving
def combineGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] =
ds.combineGroup {
- (it, out) =>
+ (it: Iterator[T], out: Collector[R]) =>
out.collect(fun(it.toStream))
}
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala
new file mode 100644
index 0000000000000..bcca65e6c0995
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.utils
+
+import java.lang.reflect.Method
+
+import org.apache.flink.api.scala.{ClosureCleaner, LambdaReturnStatementFinder}
+import org.slf4j.LoggerFactory
+
+object LambdaClosureCleaner {
+
+ def getFlinkClassLoader: ClassLoader = getClass.getClassLoader
+
+ def getContextOrFlinkClassLoader: ClassLoader =
+ Option(Thread.currentThread().getContextClassLoader).getOrElse(getFlinkClassLoader)
+
+ /** Preferred alternative to Class.forName(className) */
+ def classForName(className: String): Class[_] = {
+ Class.forName(className, true, getContextOrFlinkClassLoader)
+ // scalastyle:on classforname
+ }
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ def clean(closure: AnyRef): Unit = {
+ val writeReplaceMethod: Method = try {
+ closure.getClass.getDeclaredMethod("writeReplace")
+ } catch {
+ case e: java.lang.NoSuchMethodException =>
+ LOG.warn("Expected a Java lambda; got " + closure.getClass.getName)
+ return
+ }
+
+ writeReplaceMethod.setAccessible(true)
+ // Because we still need to support Java 7, we must use reflection here.
+ val serializedLambda: AnyRef = writeReplaceMethod.invoke(closure)
+ if (serializedLambda.getClass.getName != "java.lang.invoke.SerializedLambda") {
+ LOG.warn("Closure's writeReplace() method " +
+ s"returned ${serializedLambda.getClass.getName}, not SerializedLambda")
+ return
+ }
+
+ val serializedLambdaClass = classForName("java.lang.invoke.SerializedLambda")
+
+ val implClassName = serializedLambdaClass
+ .getDeclaredMethod("getImplClass").invoke(serializedLambda).asInstanceOf[String]
+ // TODO: we do not want to unconditionally strip this suffix.
+ val implMethodName = {
+ serializedLambdaClass
+ .getDeclaredMethod("getImplMethodName").invoke(serializedLambda).asInstanceOf[String]
+ .stripSuffix("$adapted")
+ }
+ val implMethodSignature = serializedLambdaClass
+ .getDeclaredMethod("getImplMethodSignature").invoke(serializedLambda).asInstanceOf[String]
+ val capturedArgCount = serializedLambdaClass
+ .getDeclaredMethod("getCapturedArgCount").invoke(serializedLambda).asInstanceOf[Int]
+ val capturedArgs = (0 until capturedArgCount).map { argNum: Int =>
+ serializedLambdaClass
+ .getDeclaredMethod("getCapturedArg", java.lang.Integer.TYPE)
+ .invoke(serializedLambda, argNum.asInstanceOf[Object])
+ }
+ assert(capturedArgs.size == capturedArgCount)
+ val implClass = classForName(implClassName.replaceAllLiterally("/", "."))
+
+ // Fail fast if we detect return statements in closures.
+ // TODO: match the impl method based on its type signature as well, not just its name.
+ ClosureCleaner
+ .getClassReader(implClass)
+ .accept(new LambdaReturnStatementFinder(implMethodName), 0)
+
+ // Check serializable TODO: add flag
+ ClosureCleaner.ensureSerializable(closure)
+ capturedArgs.foreach(ClosureCleaner.clean(_))
+
+ // TODO: null fields to render the closure serializable?
+ }
+}
+
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index d543998c1eb5e..9b962cf1d7421 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.operators.PartitionOperator
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.java.utils.{DataSetUtils => jutils}
import org.apache.flink.util.AbstractID
-
+import org.apache.flink.api.java.tuple.Tuple2
import _root_.scala.language.implicitConversions
import _root_.scala.reflect.ClassTag
@@ -72,7 +72,8 @@ package object utils {
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
implicitly[TypeInformation[T]]
)
- wrap(jutils.zipWithIndex(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+ wrap(jutils.zipWithIndex(self.javaSet))
+ .map { t: Tuple2[java.lang.Long, T] => (t.f0.toLong, t.f1) }
}
/**
@@ -85,7 +86,8 @@ package object utils {
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
implicitly[TypeInformation[T]]
)
- wrap(jutils.zipWithUniqueId(self.javaSet)).map { t => (t.f0.toLong, t.f1) }
+ wrap(jutils.zipWithUniqueId(self.javaSet))
+ .map { t: Tuple2[java.lang.Long, T] => (t.f0.toLong, t.f1) }
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index f95ad791295f2..17e5bb4aa645a 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -118,13 +118,14 @@ under the License.
+
net.alchim31.maven
scala-maven-plugin
+ scala classes can be resolved later in the (Java) compile phase -->
scala-compile-first
process-resources
@@ -132,9 +133,9 @@ under the License.
compile
-
+
+ scala classes can be resolved later in the (Java) test-compile phase -->
scala-test-compile
process-test-resources
@@ -148,16 +149,9 @@ under the License.
-Xms128m
-Xmx512m
-
-
- org.scalamacros
- paradise_${scala.version}
- ${scala.macros.version}
-
-
-
+
org.apache.maven.plugins
@@ -268,4 +262,50 @@ under the License.
+
+
+
+ scala-2.10
+
+
+ scala-2.10
+
+
+
+
+ org.scalamacros
+ quasiquotes_${scala.binary.version}
+ ${scala.macros.version}
+
+
+
+
+ scala
+
+
+ !scala-2.12
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+ org.scalamacros
+ paradise_${scala.version}
+ ${scala.macros.version}
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index ee9f50c6b1633..7be332c27983c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.operators.OneInputStreamOperator
-import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction}
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
@@ -484,7 +484,8 @@ class AllWindowTranslationTest {
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.reduce(
{ (x, _) => x },
- { (_, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} })
+ { (_: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) =>
+ in foreach { x => out.collect(x)} })
val transform = window1
.javaStream
@@ -721,7 +722,7 @@ class AllWindowTranslationTest {
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.aggregate(
new DummyAggregator(),
- { (_, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => {
+ { (_: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => {
in foreach { x => out.collect(x)}
} })
@@ -1069,7 +1070,7 @@ class AllWindowTranslationTest {
.fold(
("", "", 1),
{ (acc: (String, String, Int), _) => acc },
- { (_, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
+ { (_: TimeWindow, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
in foreach { x => out.collect((x._1, x._3)) }
})
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 08153be24640b..6410ce4426b50 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -54,7 +54,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val dataStream2 = env.generateSequence(0, 0).name("testSource2")
.keyBy(x=>x)
- .reduce((x, y) => 0)
+ .reduce((x, y) => 0L)
.name("testReduce")
assert("testReduce" == dataStream2.getName)
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 600645b9b4f2f..4d17b658b3927 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -547,7 +547,8 @@ class WindowTranslationTest {
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.reduce(
{ (x, _) => x },
- { (_, _, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} })
+ { (_: String, _: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)])
+ => in foreach { x => out.collect(x)} })
val transform = window1
.javaStream
@@ -790,7 +791,8 @@ class WindowTranslationTest {
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.aggregate(new DummyAggregator(),
- { (_, _, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => {
+ { (_: String, _: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) =>
+ {
in foreach { x => out.collect(x)}
} })
@@ -1197,7 +1199,8 @@ class WindowTranslationTest {
.fold(
("", "", 1),
{ (acc: (String, String, Int), _) => acc },
- { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) =>
+ { (_: String, _: TimeWindow, in: Iterable[(String, String, Int)],
+ out: Collector[(String, Int)]) =>
in foreach { x => out.collect((x._1, x._3)) }
})
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index e5e3e891cf924..29954526f963a 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -166,11 +166,6 @@ under the License.
scalatest_${scala.binary.version}
test
-
-
- com.data-artisans
- flakka-testkit_${scala.binary.version}
-
joda-time
@@ -608,4 +603,39 @@ under the License.
+
+
+
+ scala-2.12
+
+
+
+ scala-2.12
+
+
+
+
+
+ com.typesafe.akka
+ akka-testkit_2.12
+
+
+
+
+
+ scala
+
+
+
+ !scala-2.12
+
+
+
+
+ com.data-artisans
+ flakka-testkit_${scala.binary.version}
+
+
+
+
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
index 9b56c63a532ab..bf9c2c158bbf3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
@@ -51,7 +51,7 @@
*/
public class GroupCombineITCase extends MultipleProgramsTestBase {
- public GroupCombineITCase(TestExecutionMode mode) {
+ public GroupCombineITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
super(mode);
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 6399e2e41b63d..abdcca836687d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -25,8 +25,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -34,13 +34,13 @@
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import scala.concurrent.ExecutionContext$;
-import scala.concurrent.forkjoin.ForkJoinPool;
import scala.concurrent.impl.ExecutionContextImpl;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
import static org.junit.Assert.fail;
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
index 9d04ca590bca1..2541e0734caa0 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
@@ -65,7 +65,7 @@ class WordCountMapredITCase extends JavaProgramTestBase {
.sum(1)
val words = counts
- .map( t => (new Text(t._1), new LongWritable(t._2)) )
+ .map( (t: (String, Int)) => (new Text(t._1), new LongWritable(t._2)) )
val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
new TextOutputFormat[Text, LongWritable],
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
index 3b23a13b55fed..a2c7a13fbe331 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
@@ -75,7 +75,7 @@ class WordCountMapreduceITCase extends JavaProgramTestBase {
.sum(1)
val words = counts
- .map( t => (new Text(t._1), new LongWritable(t._2)) )
+ .map( (t: (String, Int)) => (new Text(t._1), new LongWritable(t._2)) )
val job = Job.getInstance()
val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index 79f781f9d4cfe..97bf377247165 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -61,7 +61,7 @@ class AggregateITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
.aggregate(Aggregations.SUM,0)
.and(Aggregations.MAX, 1)
// Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
+ .filter((in: (Int, Long, String)) => in._3 != null)
.map{ t => (t._1, t._2) }
aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
@@ -81,7 +81,7 @@ class AggregateITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
.groupBy(1)
.aggregate(Aggregations.SUM, 0)
// Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
+ .filter((in: (Int, Long, String)) => in._3 != null)
.map { t => (t._2, t._1) }
aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
@@ -102,7 +102,7 @@ class AggregateITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
.aggregate(Aggregations.MIN, 0)
.aggregate(Aggregations.MIN, 0)
// Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
+ .filter((in: (Int, Long, String)) => in._3 != null)
.map { t => new Tuple1(t._1) }
aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index 448479e817148..fc4af1810685b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -108,7 +108,8 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds = CollectionDataSets.get3TupleDataSet(env)
val ds2 = CollectionDataSets.get3TupleDataSet(env)
val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
- (first, second, out: Collector[(Int, Long, String)] ) =>
+ (first: Iterator[(Int, Long, String)], second: Iterator[(Int, Long, String)],
+ out: Collector[(Int, Long, String)] ) =>
for (t <- first) {
if (t._1 < 6) {
out.collect(t)
@@ -127,7 +128,9 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds = CollectionDataSets.get5TupleDataSet(env)
val ds2 = CollectionDataSets.get5TupleDataSet(env)
val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) {
- (first, second, out: Collector[(Int, Long, Int, String, Long)]) =>
+ (first: Iterator[(Int, Long, Int, String, Long)],
+ second: Iterator[(Int, Long, Int, String, Long)],
+ out: Collector[(Int, Long, Int, String, Long)]) =>
for (t <- second) {
if (t._1 < 4) {
out.collect(t)
@@ -247,7 +250,8 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds1 = CollectionDataSets.get5TupleDataSet(env)
val ds2 = CollectionDataSets.get3TupleDataSet(env)
val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) {
- (first, second, out: Collector[(Int, Long, String)]) =>
+ (first: Iterator[(Int, Long, Int, String, Long)],
+ second: Iterator[(Int, Long, String)], out: Collector[(Int, Long, String)]) =>
val strs = first map(_._4)
for (t <- second) {
for (s <- strs) {
@@ -273,7 +277,8 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds2 = CollectionDataSets.get3TupleDataSet(env)
val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => (t._1, t._2))
.apply {
- (first, second, out: Collector[(Int, Long, String)]) =>
+ (first: Iterator[(Int, Long, Int, String, Long)],
+ second: Iterator[(Int, Long, String)], out: Collector[(Int, Long, String)]) =>
val strs = first map(_._4)
for (t <- second) {
for (s <- strs) {
@@ -325,7 +330,9 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds = CollectionDataSets.getSmallPojoDataSet(env)
val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) {
- (first, second, out: Collector[CustomType]) =>
+ (first: Iterator[CollectionDataSets.POJO],
+ second: Iterator[(Int, String, Int, Int, Long, String, Long)],
+ out: Collector[CustomType]) =>
for (p <- first) {
for (t <- second) {
Assert.assertTrue(p.nestedPojo.longNumber == t._7)
@@ -348,7 +355,9 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds = CollectionDataSets.getSmallPojoDataSet(env)
val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
val coGroupDs = ds.coGroup(ds2).where(t => new Tuple1(t.nestedPojo.longNumber)).equalTo(6) {
- (first, second, out: Collector[CustomType]) =>
+ (first: Iterator[CollectionDataSets.POJO],
+ second: Iterator[(Int, String, Int, Int, Long, String, Long)],
+ out: Collector[CustomType]) =>
for (p <- first) {
for (t <- second) {
Assert.assertTrue(p.nestedPojo.longNumber == t._7)
@@ -371,7 +380,9 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds = CollectionDataSets.getSmallPojoDataSet(env)
val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env)
val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) {
- (first, second, out: Collector[CustomType]) =>
+ (first: Iterator[CollectionDataSets.POJO],
+ second: Iterator[(Int, String, Int, Int, Long, String, Long)],
+ out: Collector[CustomType]) =>
for (p <- first) {
for (t <- second) {
Assert.assertTrue(p.nestedPojo.longNumber == t._7)
@@ -390,7 +401,8 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
val ds2 = env.fromElements(0, 1, 2)
val coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*") {
- (first, second, out: Collector[(Int, Long, String)]) =>
+ (first: Iterator[(Int, Long, String)], second: Iterator[Int],
+ out: Collector[(Int, Long, String)]) =>
for (p <- first) {
for (t <- second) {
if (p._1 == t) {
@@ -411,7 +423,8 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo
val ds1 = env.fromElements(0, 1, 2)
val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
val coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0) {
- (first, second, out: Collector[(Int, Long, String)]) =>
+ (first: Iterator[Int], second: Iterator[(Int, Long, String)],
+ out: Collector[(Int, Long, String)]) =>
for (p <- first) {
for (t <- second) {
if (p == t._1) {
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 2dabb567c1f83..e2ffdbc3028c8 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -46,7 +46,8 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
ds.combineGroup(new ScalaGroupCombineFunctionExample())
.output(new DiscardingOutputFormat[Tuple1[String]])
- ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
+ ds.combineGroup((in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) =>
+ in.toSet foreach ((t: Tuple1[String]) => out.collect(t)))
.output(new DiscardingOutputFormat[Tuple1[String]])
// all methods on UnsortedGrouping
@@ -55,7 +56,8 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
.output(new DiscardingOutputFormat[Tuple1[String]])
ds.groupBy(0)
- .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
+ .combineGroup((in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) =>
+ in.toSet foreach ((t: Tuple1[String]) => out.collect(t)))
.output(new DiscardingOutputFormat[Tuple1[String]])
// all methods on SortedGrouping
@@ -64,7 +66,8 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
.output(new DiscardingOutputFormat[Tuple1[String]])
ds.groupBy(0).sortGroup(0, Order.ASCENDING)
- .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect))
+ .combineGroup((in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) =>
+ in.toSet foreach ((t: Tuple1[String]) => out.collect(t)))
.output(new DiscardingOutputFormat[Tuple1[String]])
env.execute()
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index 385691eb6dbc8..ac119a7db2cd8 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -23,13 +23,12 @@ import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.util.CollectionDataSets.{MutableTuple3, CustomType}
+import org.apache.flink.api.scala.util.CollectionDataSets._
import org.apache.flink.optimizer.Optimizer
import org.apache.flink.configuration.Configuration
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.apache.flink.util.Collector
-
import org.junit.Test
import org.junit.Assert._
import org.junit.Assume._
@@ -265,11 +264,12 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
@Test
def testCorrectnessOfGroupReduceIfUDFReturnsInputObjectMultipleTimesWhileChangingIt(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
+ val ds: DataSet[MutableTuple3[Int, Long, String]] = CollectionDataSets.get3TupleDataSet(env)
.map( t => MutableTuple3(t._1, t._2, t._3) )
val reduceDs = ds.groupBy(1).reduceGroup {
- (in, out: Collector[MutableTuple3[Int, Long, String]]) =>
+ (in: Iterator[MutableTuple3[Int, Long, String]],
+ out: Collector[MutableTuple3[Int, Long, String]]) =>
for (t <- in) {
if (t._1 < 4) {
t._3 = "Hi!"
@@ -488,7 +488,8 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
val ds = CollectionDataSets.getPojoContainingTupleAndWritable(env)
val reduceDs = ds.groupBy("hadoopFan", "theTuple.*").reduceGroup {
- (values, out: Collector[Int]) => {
+ (values: Iterator[CollectionDataSets.PojoContainingTupleAndWritable],
+ out: Collector[Int]) => {
var c: Int = 0
for (v <- values) {
c += 1
@@ -511,7 +512,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
val ds = CollectionDataSets.getTupleContainingPojos(env)
val reduceDs = ds.groupBy("_1", "_2.*").reduceGroup {
- (values, out: Collector[Int]) => {
+ (values: Iterator[(Int, CrazyNested, POJO)], out: Collector[Int]) => {
out.collect(values.size)
}
}
@@ -643,7 +644,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
.sortGroup("theTuple._1", Order.DESCENDING)
.sortGroup("theTuple._2", Order.DESCENDING)
.reduceGroup {
- (values, out: Collector[String]) => {
+ (values: Iterator[PojoContainingTupleAndWritable], out: Collector[String]) => {
var once: Boolean = false
val concat: StringBuilder = new StringBuilder
for (value <- values) {
@@ -803,7 +804,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas
val ds = CollectionDataSets.getPojoWithMultiplePojos(env)
val reduceDs = ds.groupBy("p2.a2").reduceGroup {
- (values, out: Collector[String]) => {
+ (values: Iterator[CollectionDataSets.PojoWithMultiplePojos], out: Collector[String]) => {
val concat: StringBuilder = new StringBuilder()
for (value <- values) {
concat.append(value.p2.a2)
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index d94f09921dd70..22a833456c9bb 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -43,7 +43,7 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
.sum(0)
.andMax(1)
// Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
+ .filter((tup: (Int, Long, String)) => tup._3 != null)
.map{ t => (t._1, t._2) }
@@ -63,7 +63,7 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
.groupBy(1)
.sum(0)
// Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
+ .filter((tup: (Int, Long, String)) => tup._3 != null)
.map { t => (t._2, t._1) }
val result : Seq[(Long, Int)] = aggregateDs.collect().sortWith((a, b) => a._1 < b._1)
@@ -83,7 +83,7 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
.min(0)
.min(0)
// Ensure aggregate operator correctly copies other fields
- .filter(_._3 != null)
+ .filter((tup: (Int, Long, String)) => tup._3 != null)
.map { t => t._1 }
val result: Seq[Int] = aggregateDs.collect()
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 11d238ace301e..082744bfc1aeb 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -67,32 +67,12 @@ under the License.
test
-
- com.data-artisans
- flakka-actor_${scala.binary.version}
-
-
-
- com.data-artisans
- flakka-remote_${scala.binary.version}
-
-
-
- com.data-artisans
- flakka-camel_${scala.binary.version}
-
-
com.google.guava
guava
${guava.version}
-
- com.data-artisans
- flakka-testkit_${scala.binary.version}
- test
-
@@ -242,4 +222,64 @@ under the License.
+
+
+
+ scala-2.12
+
+
+
+ scala-2.12
+
+
+
+
+ com.typesafe.akka
+ akka-actor_2.12
+
+
+ com.typesafe.akka
+ akka-camel_2.12
+
+
+ com.typesafe.akka
+ akka-remote_2.12
+
+
+ com.typesafe.akka
+ akka-testkit_2.12
+
+
+
+
+ scala
+
+
+
+ !scala-2.12
+
+
+
+
+ com.data-artisans
+ flakka-actor_${scala.binary.version}
+
+
+
+ com.data-artisans
+ flakka-remote_${scala.binary.version}
+
+
+
+ com.data-artisans
+ flakka-camel_${scala.binary.version}
+
+
+
+ com.data-artisans
+ flakka-testkit_${scala.binary.version}
+
+
+
+
diff --git a/pom.xml b/pom.xml
index 874bead4d93fa..0f2a318ed77ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,13 +93,17 @@ under the License.
log4j-test.properties
18.0
2.3-custom
+ 2.4.16
1.7
2.0.1
2.10.4
2.10
- 0.7.4
- 5.0.4
+ 3.0.1
+ 3.5.0
+ 1.0.2
+ 0.7.7
+ 5.2
3.4.6
2.8.0
2.7.4
@@ -313,7 +317,7 @@ under the License.
org.javassist
javassist
- 3.18.2-GA
+ 3.20.0-GA
@@ -357,7 +361,7 @@ under the License.
org.clapper
grizzled-slf4j_${scala.binary.version}
- 1.0.2
+ ${grizzled.version}
@@ -391,17 +395,45 @@ under the License.
test
+
+ com.typesafe.akka
+ akka-actor_2.12
+ ${akka.typesafe.version}
+
+
+ com.typesafe.akka
+ akka-slf4j_2.12
+ ${akka.typesafe.version}
+
+
+ com.typesafe.akka
+ akka-remote_2.12
+ ${akka.typesafe.version}
+
+
+ com.typesafe.akka
+ akka-testkit_2.12
+ ${akka.typesafe.version}
+ test
+
+
+
+ com.typesafe.akka
+ akka-camel_2.12
+ ${akka.typesafe.version}
+
+
org.scalatest
scalatest_${scala.binary.version}
- 2.2.2
+ ${scalatest.version}
test
com.github.scopt
scopt_${scala.binary.version}
- 3.5.0
+ ${scopt.version}
org.scala-lang
@@ -473,6 +505,22 @@ under the License.
+
+
+ scala-2.12
+
+
+ scala-2.12
+
+
+
+ 1.8
+ 2.12.1
+ 2.12
+ 1.3.0
+
+
+
include-yarn-tests
@@ -1317,7 +1365,7 @@ under the License.
true
- false
+ true
org.apache.flink
diff --git a/tools/change-scala-version.sh b/tools/change-scala-version.sh
index 56c48c68c66b7..1e7d20ed58ef7 100755
--- a/tools/change-scala-version.sh
+++ b/tools/change-scala-version.sh
@@ -21,7 +21,7 @@
set -e
-VALID_VERSIONS=( 2.10 2.11 )
+VALID_VERSIONS=( 2.10 2.11 2.12 )
usage() {
echo "Usage: $(basename $0) [-h|--help]
@@ -46,12 +46,17 @@ check_scala_version() {
check_scala_version "$TO_VERSION"
-if [ $TO_VERSION = "2.11" ]; then
+# This expects an order, fix this.
+if [ $TO_VERSION = "2.12" ]; then
FROM_SUFFIX="_2\.10"
+ TO_SUFFIX="_2\.12"
+else if [ $TO_VERSION = "2.11" ]; then
+ FROM_SUFFIX="_2\.12"
TO_SUFFIX="_2\.11"
else
FROM_SUFFIX="_2\.11"
TO_SUFFIX="_2\.10"
+ fi
fi
sed_i() {
@@ -93,6 +98,17 @@ find "$BASEDIR/flink-dist" -name 'opt.xml' -not -path '*target*' -print \
find "$BASEDIR/flink-runtime" -name 'pom.xml' -not -path '*target*' -print \
-exec bash -c "sed_i 's/\(org\.apache\.flink:flink-shaded-curator.*\)'$FROM_SUFFIX'<\/include>/\1'$TO_SUFFIX'<\/include>/g' {}" \;
+if [ "$TO_VERSION" == "2.12" ]; then
+ # set the profile activation to !scala-2.11 in parent pom, so that it activates by default
+ bash -c "sed_i 's/scala-2.12<\/name>/!scala-2.12<\/name>/g' $BASEDIR/pom.xml" \;
+ # set the profile activation in all sub modules to scala-2.11 (so that they are disabled by default)
+ find $BASEDIR/flink-* -name 'pom.xml' -not -path '*target*' -print \
+ -exec bash -c "sed_i 's/!scala-2.12<\/name>/scala-2.12<\/name>/g' {}" \;
+
+ # set the name of the shading artifact properly
+ bash -c "sed_i 's/\(shading-artifact.name>flink-shaded[a-z0-9\-]*\)'$FROM_SUFFIX'<\/shading-artifact.name>/\1'$TO_SUFFIX'<\/shading-artifact.name>/g' $BASEDIR/pom.xml" \;
+fi
+
if [ "$TO_VERSION" == "2.11" ]; then
# set the profile activation to !scala-2.11 in parent pom, so that it activates by default
bash -c "sed_i 's/scala-2.11<\/name>/!scala-2.11<\/name>/g' $BASEDIR/pom.xml" \;
diff --git a/tools/create_release_files.sh b/tools/create_release_files.sh
index f4c46735efab8..7b0610fc6bcc3 100755
--- a/tools/create_release_files.sh
+++ b/tools/create_release_files.sh
@@ -242,11 +242,16 @@ deploy_to_maven() {
cd flink
cp ../../deploysettings.xml .
+ echo "Deploying Scala 2.12 version"
+ cd tools && ./change-scala-version.sh 2.12 && cd ..
+
+ $MVN clean deploy -Prelease,docs-and-source --settings deploysettings.xml -DskipTests -Dgpg.executable=$GPG -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10
+
echo "Deploying Scala 2.11 version"
cd tools && ./change-scala-version.sh 2.11 && cd ..
$MVN clean deploy -Prelease,docs-and-source --settings deploysettings.xml -DskipTests -Dgpg.executable=$GPG -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10
-
+
# It is important to first deploy scala 2.11 and then scala 2.10 so that the quickstarts (which are independent of the scala version)
# are depending on scala 2.10.
echo "Deploying Scala 2.10 version"