From f5bb06c72492b3ce4ab06cc1fd6de6b30cba838a Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Tue, 3 Jan 2017 22:32:58 +0100 Subject: [PATCH 01/31] initial commit. --- .travis.yml | 7 ++++++- pom.xml | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 74dcaf23a4028..088290b85234f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,6 +14,12 @@ language: java #See https://issues.apache.org/jira/browse/FLINK-1072 matrix: include: + - jdk: "oraclejdk8" + env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" + - jdk: "oraclejdk8" + env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" + + exclude: # Always run test groups A and B together - jdk: "oraclejdk8" env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,jdk8 -Dmaven.javadoc.skip=true" @@ -47,7 +53,6 @@ matrix: git: depth: 100 - env: global: # Global variable to avoid hanging travis builds when downloading cache archives. diff --git a/pom.xml b/pom.xml index 874bead4d93fa..4845b12538e18 100644 --- a/pom.xml +++ b/pom.xml @@ -473,6 +473,20 @@ under the License. + + + scala-2.12 + + + !scala-2.12 + + + + 2.12.1 + 2.12 + + + include-yarn-tests From c702228558fa8d6bb66f3a344e55b879f57dcee6 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 14:59:17 +0200 Subject: [PATCH 02/31] Merge branch 'master' of https://github.com/apache/flink into feature/scala-2.12 # Conflicts: # pom.xml --- pom.xml | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 4845b12538e18..62d0c48b1cdd8 100644 --- a/pom.xml +++ b/pom.xml @@ -98,7 +98,10 @@ under the License. 2.10.4 2.10 - 0.7.4 + 2.2.2 + 3.5.0 + 1.0.2 + 0.8.1 5.0.4 3.4.6 2.8.0 @@ -357,7 +360,7 @@ under the License. org.clapper grizzled-slf4j_${scala.binary.version} - 1.0.2 + ${grizzled.version} @@ -394,14 +397,14 @@ under the License. org.scalatest scalatest_${scala.binary.version} - 2.2.2 + ${scalatest.version} test com.github.scopt scopt_${scala.binary.version} - 3.5.0 + ${scopt.version} org.scala-lang @@ -484,6 +487,10 @@ under the License. 2.12.1 2.12 + 0.9.0 + 3.0.1 + 3.5.0 + 1.3.0 From 92faa20e94e12657b24d65a9d6beaf5eff8195a5 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 15:05:55 +0200 Subject: [PATCH 03/31] Change shell scripts --- tools/change-scala-version.sh | 20 ++++++++++++++++++-- tools/create_release_files.sh | 7 ++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/tools/change-scala-version.sh b/tools/change-scala-version.sh index 56c48c68c66b7..1e7d20ed58ef7 100755 --- a/tools/change-scala-version.sh +++ b/tools/change-scala-version.sh @@ -21,7 +21,7 @@ set -e -VALID_VERSIONS=( 2.10 2.11 ) +VALID_VERSIONS=( 2.10 2.11 2.12 ) usage() { echo "Usage: $(basename $0) [-h|--help] @@ -46,12 +46,17 @@ check_scala_version() { check_scala_version "$TO_VERSION" -if [ $TO_VERSION = "2.11" ]; then +# This expects an order, fix this. +if [ $TO_VERSION = "2.12" ]; then FROM_SUFFIX="_2\.10" + TO_SUFFIX="_2\.12" +else if [ $TO_VERSION = "2.11" ]; then + FROM_SUFFIX="_2\.12" TO_SUFFIX="_2\.11" else FROM_SUFFIX="_2\.11" TO_SUFFIX="_2\.10" + fi fi sed_i() { @@ -93,6 +98,17 @@ find "$BASEDIR/flink-dist" -name 'opt.xml' -not -path '*target*' -print \ find "$BASEDIR/flink-runtime" -name 'pom.xml' -not -path '*target*' -print \ -exec bash -c "sed_i 's/\(org\.apache\.flink:flink-shaded-curator.*\)'$FROM_SUFFIX'<\/include>/\1'$TO_SUFFIX'<\/include>/g' {}" \; +if [ "$TO_VERSION" == "2.12" ]; then + # set the profile activation to !scala-2.11 in parent pom, so that it activates by default + bash -c "sed_i 's/scala-2.12<\/name>/!scala-2.12<\/name>/g' $BASEDIR/pom.xml" \; + # set the profile activation in all sub modules to scala-2.11 (so that they are disabled by default) + find $BASEDIR/flink-* -name 'pom.xml' -not -path '*target*' -print \ + -exec bash -c "sed_i 's/!scala-2.12<\/name>/scala-2.12<\/name>/g' {}" \; + + # set the name of the shading artifact properly + bash -c "sed_i 's/\(shading-artifact.name>flink-shaded[a-z0-9\-]*\)'$FROM_SUFFIX'<\/shading-artifact.name>/\1'$TO_SUFFIX'<\/shading-artifact.name>/g' $BASEDIR/pom.xml" \; +fi + if [ "$TO_VERSION" == "2.11" ]; then # set the profile activation to !scala-2.11 in parent pom, so that it activates by default bash -c "sed_i 's/scala-2.11<\/name>/!scala-2.11<\/name>/g' $BASEDIR/pom.xml" \; diff --git a/tools/create_release_files.sh b/tools/create_release_files.sh index f4c46735efab8..7b0610fc6bcc3 100755 --- a/tools/create_release_files.sh +++ b/tools/create_release_files.sh @@ -242,11 +242,16 @@ deploy_to_maven() { cd flink cp ../../deploysettings.xml . + echo "Deploying Scala 2.12 version" + cd tools && ./change-scala-version.sh 2.12 && cd .. + + $MVN clean deploy -Prelease,docs-and-source --settings deploysettings.xml -DskipTests -Dgpg.executable=$GPG -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10 + echo "Deploying Scala 2.11 version" cd tools && ./change-scala-version.sh 2.11 && cd .. $MVN clean deploy -Prelease,docs-and-source --settings deploysettings.xml -DskipTests -Dgpg.executable=$GPG -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10 - + # It is important to first deploy scala 2.11 and then scala 2.10 so that the quickstarts (which are independent of the scala version) # are depending on scala 2.10. echo "Deploying Scala 2.10 version" From 80696a59cf217d36dd16311af6925d4c5ca178a5 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 18:22:36 +0200 Subject: [PATCH 04/31] part1 --- flink-clients/pom.xml | 34 ++++- .../examples/scala/graph/DeltaPageRank.scala | 4 +- .../examples/scala/graph/PageRankBasic.scala | 2 +- .../scala/graph/TransitiveClosureNaive.scala | 5 +- .../scala/relational/WebLogAnalysis.scala | 12 +- flink-runtime-web/pom.xml | 33 ++++- flink-runtime/pom.xml | 75 +++++++--- .../runtime/taskmanager/TaskManager.scala | 4 +- .../testingUtils/TestingJobManagerLike.scala | 2 +- flink-scala/pom.xml | 138 ++++++++++++------ .../acceptPartialFunctions/OnDataSet.scala | 5 +- .../OnGroupedDataSet.scala | 5 +- .../flink/api/scala/utils/package.scala | 8 +- flink-tests/pom.xml | 33 ++++- pom.xml | 27 +++- 15 files changed, 286 insertions(+), 101 deletions(-) diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml index 0e0c1464d4b27..d2ff36d08ced9 100644 --- a/flink-clients/pom.xml +++ b/flink-clients/pom.xml @@ -76,12 +76,7 @@ under the License. ${project.version} test - - - com.data-artisans - flakka-testkit_${scala.binary.version} - test - + + !scala-2.11 + + + + + com.typesafe.akka + akka-testkit_2.12 + + + + + scala + + + com.data-artisans + flakka-testkit_${scala.binary.version} + + + + diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala index 159891bad4cb2..9e9330717387b 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala @@ -77,7 +77,7 @@ object DeltaPageRank { (solutionSet, workset) => { val deltas = workset.join(adjacency).where(0).equalTo(0) { - (lastDeltas, adj, out: Collector[Page]) => + (lastDeltas: (Long, Double), adj: (Long, Array[Long]), out: Collector[Page]) => { val targets = adj._2 val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / targets.length @@ -88,7 +88,7 @@ object DeltaPageRank { } } .groupBy(0).sum(1) - .filter(x => Math.abs(x._2) > THRESHOLD) + .filter((x: (Long, Double)) => Math.abs(x._2) > THRESHOLD) val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) { (current, delta) => (current._1, current._2 + delta._2) diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala index 1f842d5b4b96a..5bd572fb4474b 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala @@ -114,7 +114,7 @@ object PageRankBasic { // collect ranks and sum them up .groupBy("pageId").aggregate(SUM, "rank") // apply dampening factor - .map { p => + .map { p: Page => Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages)) }.withForwardedFields("pageId") diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala index 1f3a32b0a167c..1e99c7bd1f1c7 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala @@ -58,7 +58,7 @@ object TransitiveClosureNaive { val nextPaths = prevPaths .join(edges) .where(1).equalTo(0) { - (left, right) => (left._1,right._2) + (left: (Long, Long), right: (Long, Long)) => (left._1,right._2) }.withForwardedFieldsFirst("_1").withForwardedFieldsSecond("_2") .union(prevPaths) .groupBy(0, 1) @@ -67,7 +67,8 @@ object TransitiveClosureNaive { val terminate = prevPaths .coGroup(nextPaths) .where(0).equalTo(0) { - (prev, next, out: Collector[(Long, Long)]) => { + (prev: Iterator[(Long, Long)], next: Iterator[(Long, Long)], + out: Collector[(Long, Long)]) => { val prevPaths = prev.toSet for (n <- next) if (!prevPaths.contains(n)) out.collect(n) diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala index 7ed39c9428112..9d48dc8e949ba 100644 --- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala @@ -105,20 +105,22 @@ object WebLogAnalysis { val visits = getVisitsDataSet(env, params) val filteredDocs = documents - .filter(doc => doc._2.contains(" editors ") && doc._2.contains(" oscillations ")) + .filter((doc: (String, String)) => + doc._2.contains(" editors ") && doc._2.contains(" oscillations ")) val filteredRanks = ranks - .filter(rank => rank._1 > 40) + .filter((rank: (Int, String, Int)) => rank._1 > 40) val filteredVisits = visits - .filter(visit => visit._2.substring(0, 4).toInt == 2007) + .filter((visit: (String, String)) => visit._2.substring(0, 4).toInt == 2007) val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) { - (doc, rank) => rank + (doc: (String, String), rank: (Int, String, Int)) => rank }.withForwardedFieldsSecond("*") val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) { - (ranks, visits, out: Collector[(Int, String, Int)]) => + (ranks: Iterator[(Int, String, Int)], visits: Iterator[(String, String)], + out: Collector[(Int, String, Int)]) => if (visits.isEmpty) for (rank <- ranks) out.collect(rank) }.withForwardedFieldsFirst("*") diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml index a50e01f88902d..5847096442747 100644 --- a/flink-runtime-web/pom.xml +++ b/flink-runtime-web/pom.xml @@ -91,11 +91,6 @@ under the License. test - - com.data-artisans - flakka-testkit_${scala.binary.version} - test - org.apache.flink @@ -132,5 +127,33 @@ under the License. + + + + scala-2.12 + + + + !scala-2.11 + + + + + com.typesafe.akka + akka-testkit_2.12 + + + + + scala + + + + com.data-artisans + flakka-testkit_${scala.binary.version} + + + + diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index a6b9513803ccf..1b79bcf6fe050 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -95,22 +95,7 @@ under the License. org.scala-lang scala-library - - - com.data-artisans - flakka-actor_${scala.binary.version} - - - - com.data-artisans - flakka-remote_${scala.binary.version} - - - - com.data-artisans - flakka-slf4j_${scala.binary.version} - - + org.clapper grizzled-slf4j_${scala.binary.version} @@ -184,10 +169,6 @@ under the License. test - - com.data-artisans - flakka-testkit_${scala.binary.version} - org.reflections @@ -395,4 +376,58 @@ under the License. + + + + scala-2.12 + + + + !scala-2.11 + + + + + com.typesafe.akka + akka-actor_2.12 + + + com.typesafe.akka + akka-slf4j_2.12 + + + com.typesafe.akka + akka-remote_2.12 + + + com.typesafe.akka + akka-testkit_2.12 + + + + + scala + + + com.data-artisans + flakka-actor_${scala.binary.version} + + + + com.data-artisans + flakka-remote_${scala.binary.version} + + + + com.data-artisans + flakka-slf4j_${scala.binary.version} + + + + com.data-artisans + flakka-testkit_${scala.binary.version} + + + + diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 2e8a6fa19132a..8c344d478b314 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1350,8 +1350,8 @@ class TaskManager( accumulatorEvents.append(accumulators) } catch { case e: Exception => - log.warn("Failed to take accumulator snapshot for task {}.", - execID, ExceptionUtils.getRootCause(e)) + log.warn(s"Failed to take accumulator snapshot for task $execID.", + ExceptionUtils.getRootCause(e)) } } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index a3d31f595ef34..868057d9cc348 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -68,7 +68,7 @@ trait TestingJobManagerLike extends FlinkActor { val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder( new Ordering[(Int, ActorRef)] { override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1 - }) + }).result() val waitForClient = scala.collection.mutable.HashSet[ActorRef]() diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml index f4c8246ddca3b..26b688eead386 100644 --- a/flink-scala/pom.xml +++ b/flink-scala/pom.xml @@ -106,46 +106,6 @@ under the License. com.github.siom79.japicmp japicmp-maven-plugin - - - - net.alchim31.maven - scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - -Xms128m - -Xmx512m - - - - org.scalamacros - paradise_${scala.version} - ${scala.macros.version} - - - - @@ -228,8 +188,7 @@ under the License. scala-2.10 - - !scala-2.11 + !scala-2.12 @@ -239,7 +198,100 @@ under the License. ${scala.macros.version} + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + + + + + + scala-2.12 + + + !scala-2.10 + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + + + + - + diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala index b2521b07a7005..9f28c3de94f43 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnDataSet.scala @@ -20,6 +20,7 @@ package org.apache.flink.api.scala.extensions.impl.acceptPartialFunctions import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.{DataSet, GroupedDataSet} +import org.apache.flink.util.Collector import scala.reflect.ClassTag @@ -53,7 +54,7 @@ class OnDataSet[T](ds: DataSet[T]) { @PublicEvolving def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] = ds.mapPartition { - (it, out) => + (it: Iterator[T], out: Collector[R]) => out.collect(fun(it.toStream)) } @@ -100,7 +101,7 @@ class OnDataSet[T](ds: DataSet[T]) { @PublicEvolving def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] = ds.reduceGroup { - (it, out) => + (it: Iterator[T], out: Collector[R]) => out.collect(fun(it.toStream)) } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala index 07abccb423846..3636358887742 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/impl/acceptPartialFunctions/OnGroupedDataSet.scala @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.operators.Order import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.{DataSet, GroupedDataSet} +import org.apache.flink.util.Collector import scala.reflect.ClassTag @@ -65,7 +66,7 @@ class OnGroupedDataSet[T](ds: GroupedDataSet[T]) { @PublicEvolving def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] = ds.reduceGroup { - (it, out) => + (it: Iterator[T], out: Collector[R]) => out.collect(fun(it.toStream)) } @@ -80,7 +81,7 @@ class OnGroupedDataSet[T](ds: GroupedDataSet[T]) { @PublicEvolving def combineGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R] = ds.combineGroup { - (it, out) => + (it: Iterator[T], out: Collector[R]) => out.collect(fun(it.toStream)) } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala index d543998c1eb5e..9b962cf1d7421 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala @@ -31,7 +31,7 @@ import org.apache.flink.api.java.operators.PartitionOperator import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java.utils.{DataSetUtils => jutils} import org.apache.flink.util.AbstractID - +import org.apache.flink.api.java.tuple.Tuple2 import _root_.scala.language.implicitConversions import _root_.scala.reflect.ClassTag @@ -72,7 +72,8 @@ package object utils { BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], implicitly[TypeInformation[T]] ) - wrap(jutils.zipWithIndex(self.javaSet)).map { t => (t.f0.toLong, t.f1) } + wrap(jutils.zipWithIndex(self.javaSet)) + .map { t: Tuple2[java.lang.Long, T] => (t.f0.toLong, t.f1) } } /** @@ -85,7 +86,8 @@ package object utils { BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], implicitly[TypeInformation[T]] ) - wrap(jutils.zipWithUniqueId(self.javaSet)).map { t => (t.f0.toLong, t.f1) } + wrap(jutils.zipWithUniqueId(self.javaSet)) + .map { t: Tuple2[java.lang.Long, T] => (t.f0.toLong, t.f1) } } // -------------------------------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index e5e3e891cf924..3df948a1240d1 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -166,11 +166,6 @@ under the License. scalatest_${scala.binary.version} test - - - com.data-artisans - flakka-testkit_${scala.binary.version} - joda-time @@ -608,4 +603,32 @@ under the License. + + + + scala-2.12 + + + + !scala-2.11 + + + + + + com.typesafe.akka + akka-testkit_2.12 + + + + + scala + + + com.data-artisans + flakka-testkit_${scala.binary.version} + + + + diff --git a/pom.xml b/pom.xml index 62d0c48b1cdd8..8c0983e00e962 100644 --- a/pom.xml +++ b/pom.xml @@ -394,6 +394,28 @@ under the License. test + + com.typesafe.akka + akka-actor_2.12 + ${akka.typesafe.version} + + + com.typesafe.akka + akka-slf4j_2.12 + ${akka.typesafe.version} + + + com.typesafe.akka + akka-remote_2.12 + ${akka.typesafe.version} + + + com.typesafe.akka + akka-testkit_2.12 + ${akka.typesafe.version} + test + + org.scalatest scalatest_${scala.binary.version} @@ -481,12 +503,13 @@ under the License. scala-2.12 - !scala-2.12 + scala-2.12 2.12.1 2.12 + 2.4.16 0.9.0 3.0.1 3.5.0 @@ -1338,7 +1361,7 @@ under the License. true - false + true org.apache.flink From d6560a35bd94148c0096f5955d24a8552c55fb13 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 20:34:06 +0200 Subject: [PATCH 05/31] flink-tests --- .../hadoop/mapred/WordCountMapredITCase.scala | 2 +- .../mapreduce/WordCountMapreduceITCase.scala | 2 +- .../api/scala/operators/AggregateITCase.scala | 6 ++-- .../api/scala/operators/CoGroupITCase.scala | 31 +++++++++++++------ .../scala/operators/GroupCombineITCase.scala | 9 ++++-- .../scala/operators/GroupReduceITCase.scala | 17 +++++----- .../api/scala/operators/SumMinMaxITCase.scala | 6 ++-- 7 files changed, 45 insertions(+), 28 deletions(-) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala index 9d04ca590bca1..2541e0734caa0 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala @@ -65,7 +65,7 @@ class WordCountMapredITCase extends JavaProgramTestBase { .sum(1) val words = counts - .map( t => (new Text(t._1), new LongWritable(t._2)) ) + .map( (t: (String, Int)) => (new Text(t._1), new LongWritable(t._2)) ) val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable]( new TextOutputFormat[Text, LongWritable], diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala index 3b23a13b55fed..a2c7a13fbe331 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala @@ -75,7 +75,7 @@ class WordCountMapreduceITCase extends JavaProgramTestBase { .sum(1) val words = counts - .map( t => (new Text(t._1), new LongWritable(t._2)) ) + .map( (t: (String, Int)) => (new Text(t._1), new LongWritable(t._2)) ) val job = Job.getInstance() val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable]( diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala index 79f781f9d4cfe..97bf377247165 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala @@ -61,7 +61,7 @@ class AggregateITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( .aggregate(Aggregations.SUM,0) .and(Aggregations.MAX, 1) // Ensure aggregate operator correctly copies other fields - .filter(_._3 != null) + .filter((in: (Int, Long, String)) => in._3 != null) .map{ t => (t._1, t._2) } aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) @@ -81,7 +81,7 @@ class AggregateITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( .groupBy(1) .aggregate(Aggregations.SUM, 0) // Ensure aggregate operator correctly copies other fields - .filter(_._3 != null) + .filter((in: (Int, Long, String)) => in._3 != null) .map { t => (t._2, t._1) } aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) @@ -102,7 +102,7 @@ class AggregateITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( .aggregate(Aggregations.MIN, 0) .aggregate(Aggregations.MIN, 0) // Ensure aggregate operator correctly copies other fields - .filter(_._3 != null) + .filter((in: (Int, Long, String)) => in._3 != null) .map { t => new Tuple1(t._1) } aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala index 448479e817148..fc4af1810685b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala @@ -108,7 +108,8 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds = CollectionDataSets.get3TupleDataSet(env) val ds2 = CollectionDataSets.get3TupleDataSet(env) val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) { - (first, second, out: Collector[(Int, Long, String)] ) => + (first: Iterator[(Int, Long, String)], second: Iterator[(Int, Long, String)], + out: Collector[(Int, Long, String)] ) => for (t <- first) { if (t._1 < 6) { out.collect(t) @@ -127,7 +128,9 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds = CollectionDataSets.get5TupleDataSet(env) val ds2 = CollectionDataSets.get5TupleDataSet(env) val coGroupDs = ds.coGroup(ds2).where(0).equalTo(0) { - (first, second, out: Collector[(Int, Long, Int, String, Long)]) => + (first: Iterator[(Int, Long, Int, String, Long)], + second: Iterator[(Int, Long, Int, String, Long)], + out: Collector[(Int, Long, Int, String, Long)]) => for (t <- second) { if (t._1 < 4) { out.collect(t) @@ -247,7 +250,8 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds1 = CollectionDataSets.get5TupleDataSet(env) val ds2 = CollectionDataSets.get3TupleDataSet(env) val coGrouped = ds1.coGroup(ds2).where(0,4).equalTo(0, 1) { - (first, second, out: Collector[(Int, Long, String)]) => + (first: Iterator[(Int, Long, Int, String, Long)], + second: Iterator[(Int, Long, String)], out: Collector[(Int, Long, String)]) => val strs = first map(_._4) for (t <- second) { for (s <- strs) { @@ -273,7 +277,8 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds2 = CollectionDataSets.get3TupleDataSet(env) val coGrouped = ds1.coGroup(ds2).where(t => (t._1, t._5)).equalTo(t => (t._1, t._2)) .apply { - (first, second, out: Collector[(Int, Long, String)]) => + (first: Iterator[(Int, Long, Int, String, Long)], + second: Iterator[(Int, Long, String)], out: Collector[(Int, Long, String)]) => val strs = first map(_._4) for (t <- second) { for (s <- strs) { @@ -325,7 +330,9 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds = CollectionDataSets.getSmallPojoDataSet(env) val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env) val coGroupDs = ds.coGroup(ds2).where("nestedPojo.longNumber").equalTo(6) { - (first, second, out: Collector[CustomType]) => + (first: Iterator[CollectionDataSets.POJO], + second: Iterator[(Int, String, Int, Int, Long, String, Long)], + out: Collector[CustomType]) => for (p <- first) { for (t <- second) { Assert.assertTrue(p.nestedPojo.longNumber == t._7) @@ -348,7 +355,9 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds = CollectionDataSets.getSmallPojoDataSet(env) val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env) val coGroupDs = ds.coGroup(ds2).where(t => new Tuple1(t.nestedPojo.longNumber)).equalTo(6) { - (first, second, out: Collector[CustomType]) => + (first: Iterator[CollectionDataSets.POJO], + second: Iterator[(Int, String, Int, Int, Long, String, Long)], + out: Collector[CustomType]) => for (p <- first) { for (t <- second) { Assert.assertTrue(p.nestedPojo.longNumber == t._7) @@ -371,7 +380,9 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds = CollectionDataSets.getSmallPojoDataSet(env) val ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env) val coGroupDs = ds.coGroup(ds2).where(_.nestedPojo.longNumber).equalTo(6) { - (first, second, out: Collector[CustomType]) => + (first: Iterator[CollectionDataSets.POJO], + second: Iterator[(Int, String, Int, Int, Long, String, Long)], + out: Collector[CustomType]) => for (p <- first) { for (t <- second) { Assert.assertTrue(p.nestedPojo.longNumber == t._7) @@ -390,7 +401,8 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) val ds2 = env.fromElements(0, 1, 2) val coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*") { - (first, second, out: Collector[(Int, Long, String)]) => + (first: Iterator[(Int, Long, String)], second: Iterator[Int], + out: Collector[(Int, Long, String)]) => for (p <- first) { for (t <- second) { if (p._1 == t) { @@ -411,7 +423,8 @@ class CoGroupITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo val ds1 = env.fromElements(0, 1, 2) val ds2 = CollectionDataSets.getSmall3TupleDataSet(env) val coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0) { - (first, second, out: Collector[(Int, Long, String)]) => + (first: Iterator[Int], second: Iterator[(Int, Long, String)], + out: Collector[(Int, Long, String)]) => for (p <- first) { for (t <- second) { if (p == t._1) { diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala index 2dabb567c1f83..3839ff31a6f35 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala @@ -46,7 +46,8 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa ds.combineGroup(new ScalaGroupCombineFunctionExample()) .output(new DiscardingOutputFormat[Tuple1[String]]) - ds.combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect)) + ds.combineGroup((in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) => + in.toSet foreach (out.collect(_))) .output(new DiscardingOutputFormat[Tuple1[String]]) // all methods on UnsortedGrouping @@ -55,7 +56,8 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa .output(new DiscardingOutputFormat[Tuple1[String]]) ds.groupBy(0) - .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect)) + .combineGroup((in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) => + in.toSet foreach (out.collect(_))) .output(new DiscardingOutputFormat[Tuple1[String]]) // all methods on SortedGrouping @@ -64,7 +66,8 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa .output(new DiscardingOutputFormat[Tuple1[String]]) ds.groupBy(0).sortGroup(0, Order.ASCENDING) - .combineGroup((in, out: Collector[Tuple1[String]]) => in.toSet foreach (out.collect)) + .combineGroup((in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) => + in.toSet foreach (out.collect(_))) .output(new DiscardingOutputFormat[Tuple1[String]]) env.execute() diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala index 385691eb6dbc8..ac119a7db2cd8 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala @@ -23,13 +23,12 @@ import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.operators.Order import org.apache.flink.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.scala.util.CollectionDataSets.{MutableTuple3, CustomType} +import org.apache.flink.api.scala.util.CollectionDataSets._ import org.apache.flink.optimizer.Optimizer import org.apache.flink.configuration.Configuration import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.MultipleProgramsTestBase import org.apache.flink.util.Collector - import org.junit.Test import org.junit.Assert._ import org.junit.Assume._ @@ -265,11 +264,12 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas @Test def testCorrectnessOfGroupReduceIfUDFReturnsInputObjectMultipleTimesWhileChangingIt(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env) + val ds: DataSet[MutableTuple3[Int, Long, String]] = CollectionDataSets.get3TupleDataSet(env) .map( t => MutableTuple3(t._1, t._2, t._3) ) val reduceDs = ds.groupBy(1).reduceGroup { - (in, out: Collector[MutableTuple3[Int, Long, String]]) => + (in: Iterator[MutableTuple3[Int, Long, String]], + out: Collector[MutableTuple3[Int, Long, String]]) => for (t <- in) { if (t._1 < 4) { t._3 = "Hi!" @@ -488,7 +488,8 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas val ds = CollectionDataSets.getPojoContainingTupleAndWritable(env) val reduceDs = ds.groupBy("hadoopFan", "theTuple.*").reduceGroup { - (values, out: Collector[Int]) => { + (values: Iterator[CollectionDataSets.PojoContainingTupleAndWritable], + out: Collector[Int]) => { var c: Int = 0 for (v <- values) { c += 1 @@ -511,7 +512,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas val ds = CollectionDataSets.getTupleContainingPojos(env) val reduceDs = ds.groupBy("_1", "_2.*").reduceGroup { - (values, out: Collector[Int]) => { + (values: Iterator[(Int, CrazyNested, POJO)], out: Collector[Int]) => { out.collect(values.size) } } @@ -643,7 +644,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas .sortGroup("theTuple._1", Order.DESCENDING) .sortGroup("theTuple._2", Order.DESCENDING) .reduceGroup { - (values, out: Collector[String]) => { + (values: Iterator[PojoContainingTupleAndWritable], out: Collector[String]) => { var once: Boolean = false val concat: StringBuilder = new StringBuilder for (value <- values) { @@ -803,7 +804,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas val ds = CollectionDataSets.getPojoWithMultiplePojos(env) val reduceDs = ds.groupBy("p2.a2").reduceGroup { - (values, out: Collector[String]) => { + (values: Iterator[CollectionDataSets.PojoWithMultiplePojos], out: Collector[String]) => { val concat: StringBuilder = new StringBuilder() for (value <- values) { concat.append(value.p2.a2) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala index d94f09921dd70..22a833456c9bb 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala @@ -43,7 +43,7 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( .sum(0) .andMax(1) // Ensure aggregate operator correctly copies other fields - .filter(_._3 != null) + .filter((tup: (Int, Long, String)) => tup._3 != null) .map{ t => (t._1, t._2) } @@ -63,7 +63,7 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( .groupBy(1) .sum(0) // Ensure aggregate operator correctly copies other fields - .filter(_._3 != null) + .filter((tup: (Int, Long, String)) => tup._3 != null) .map { t => (t._2, t._1) } val result : Seq[(Long, Int)] = aggregateDs.collect().sortWith((a, b) => a._1 < b._1) @@ -83,7 +83,7 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase( .min(0) .min(0) // Ensure aggregate operator correctly copies other fields - .filter(_._3 != null) + .filter((tup: (Int, Long, String)) => tup._3 != null) .map { t => t._1 } val result: Seq[Int] = aggregateDs.collect() From dd69422bd3697794526e60b24de8e07ae92f229b Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 20:51:13 +0200 Subject: [PATCH 06/31] fix compile error [ERROR] /Users/jens/Development/flink/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java:54: error: not found: type TestExecutionMode [INFO] public GroupCombineITCase(TestExecutionMode mode) { maybe related to https://github.com/scala/bug/issues/10207 ? --- .../apache/flink/test/javaApiOperators/GroupCombineITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java index 9b56c63a532ab..bf9c2c158bbf3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java @@ -51,7 +51,7 @@ */ public class GroupCombineITCase extends MultipleProgramsTestBase { - public GroupCombineITCase(TestExecutionMode mode) { + public GroupCombineITCase(MultipleProgramsTestBase.TestExecutionMode mode) { super(mode); } From 8d4e3452aa7fc836edf8252f0d951d722bbc71b8 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 21:02:05 +0200 Subject: [PATCH 07/31] since scala 2.12 use java.util.concurrent.forkjoinpool directly --- .../test/runtime/minicluster/LocalFlinkMiniClusterITCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java index 6399e2e41b63d..abdcca836687d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java @@ -25,8 +25,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -34,13 +34,13 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; import scala.concurrent.ExecutionContext$; -import scala.concurrent.forkjoin.ForkJoinPool; import scala.concurrent.impl.ExecutionContextImpl; import java.lang.reflect.Field; import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ForkJoinPool; import static org.junit.Assert.fail; From bf13083d93b18a1432a451a87929463c7a3ed3d8 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 21:42:48 +0200 Subject: [PATCH 08/31] fix ambiguous methods --- flink-streaming-scala/pom.xml | 205 ++++++++++++++---- .../api/scala/AllWindowTranslationTest.scala | 9 +- .../streaming/api/scala/DataStreamTest.scala | 2 +- .../api/scala/WindowTranslationTest.scala | 9 +- flink-tests/pom.xml | 1 + 5 files changed, 179 insertions(+), 47 deletions(-) diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml index f95ad791295f2..d0ba2d9a72b4a 100644 --- a/flink-streaming-scala/pom.xml +++ b/flink-streaming-scala/pom.xml @@ -118,45 +118,6 @@ under the License. - - - net.alchim31.maven - scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - -Xms128m - -Xmx512m - - - - org.scalamacros - paradise_${scala.version} - ${scala.macros.version} - - - - @@ -268,4 +229,170 @@ under the License. + + + + scala-2.10 + + + !scala-2.12 + + + + + org.scalamacros + quasiquotes_${scala.binary.version} + ${scala.macros.version} + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + + + + + + scala-2.11 + + + !scala-2.12 + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + + + + + + scala-2.12 + + + !scala-2.10 + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + + + + + + + diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index ee9f50c6b1633..7be332c27983c 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.operators.OneInputStreamOperator -import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction} import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.evictors.CountEvictor @@ -484,7 +484,8 @@ class AllWindowTranslationTest { .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) .reduce( { (x, _) => x }, - { (_, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} }) + { (_: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => + in foreach { x => out.collect(x)} }) val transform = window1 .javaStream @@ -721,7 +722,7 @@ class AllWindowTranslationTest { .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) .aggregate( new DummyAggregator(), - { (_, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => { + { (_: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => { in foreach { x => out.collect(x)} } }) @@ -1069,7 +1070,7 @@ class AllWindowTranslationTest { .fold( ("", "", 1), { (acc: (String, String, Int), _) => acc }, - { (_, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) => + { (_: TimeWindow, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) => in foreach { x => out.collect((x._1, x._3)) } }) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index 08153be24640b..6410ce4426b50 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -54,7 +54,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { val dataStream2 = env.generateSequence(0, 0).name("testSource2") .keyBy(x=>x) - .reduce((x, y) => 0) + .reduce((x, y) => 0L) .name("testReduce") assert("testReduce" == dataStream2.getName) diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 600645b9b4f2f..4d17b658b3927 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -547,7 +547,8 @@ class WindowTranslationTest { .window(TumblingEventTimeWindows.of(Time.seconds(1))) .reduce( { (x, _) => x }, - { (_, _, in, out: Collector[(String, Int)]) => in foreach { x => out.collect(x)} }) + { (_: String, _: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) + => in foreach { x => out.collect(x)} }) val transform = window1 .javaStream @@ -790,7 +791,8 @@ class WindowTranslationTest { .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .aggregate(new DummyAggregator(), - { (_, _, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => { + { (_: String, _: TimeWindow, in: Iterable[(String, Int)], out: Collector[(String, Int)]) => + { in foreach { x => out.collect(x)} } }) @@ -1197,7 +1199,8 @@ class WindowTranslationTest { .fold( ("", "", 1), { (acc: (String, String, Int), _) => acc }, - { (_, _, in: Iterable[(String, String, Int)], out: Collector[(String, Int)]) => + { (_: String, _: TimeWindow, in: Iterable[(String, String, Int)], + out: Collector[(String, Int)]) => in foreach { x => out.collect((x._1, x._3)) } }) diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 3df948a1240d1..bfc1c25407ef3 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -621,6 +621,7 @@ under the License. + scala From edcf0e3fc25e19e98649540ed55ec94160b2805f Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 22:14:49 +0200 Subject: [PATCH 09/31] take into account that only kafka 0.10 is for scala 2.12 --- .../flink-connector-kafka-0.10/pom.xml | 2 +- .../kafka/KafkaTestEnvironmentImpl.java | 6 +++++- .../flink-connector-kafka-base/pom.xml | 2 +- flink-connectors/pom.xml | 16 ++++++++++++++-- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml index e061178b8488f..e398ef277b2bf 100644 --- a/flink-connectors/flink-connector-kafka-0.10/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml @@ -37,7 +37,7 @@ under the License. - 0.10.0.1 + 0.10.1.1 diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index d27e53aa59d9c..1ff548a3db214 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -20,6 +20,7 @@ import kafka.admin.AdminUtils; import kafka.common.KafkaException; +import kafka.metrics.KafkaMetricsReporter; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.SystemTime$; @@ -42,6 +43,7 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.collection.Seq$; import java.io.File; import java.net.BindException; @@ -381,7 +383,9 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except try { scala.Option stringNone = scala.Option.apply(null); - KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone); + scala.collection.Seq emptyReporter = + Seq$.MODULE$.empty(); + KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone, emptyReporter); server.startup(); return server; } diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml index fa401bd36d94a..3cb8b7cca5147 100644 --- a/flink-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-connectors/flink-connector-kafka-base/pom.xml @@ -37,7 +37,7 @@ under the License. - 0.8.2.2 + 0.10.1.1 diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 5d8ca700f951d..6889e87a10ef0 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -42,8 +42,6 @@ under the License. flink-hbase flink-hcatalog flink-connector-kafka-base - flink-connector-kafka-0.8 - flink-connector-kafka-0.9 flink-connector-kafka-0.10 flink-connector-elasticsearch-base flink-connector-elasticsearch @@ -100,6 +98,20 @@ under the License. flink-connector-elasticsearch5 + + + scala-2.12 + + 0.10.1.1 + + + + scala + + flink-connector-kafka-0.8 + flink-connector-kafka-0.9 + + From 3027e0fbc92627d42a0d2257e7623ee6ad371531 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 22:31:29 +0200 Subject: [PATCH 10/31] gelly --- flink-libraries/flink-gelly-examples/pom.xml | 143 +++++++++++++----- flink-libraries/flink-gelly-scala/pom.xml | 141 ++++++++++++----- .../org/apache/flink/graph/scala/Graph.scala | 60 ++++---- 3 files changed, 239 insertions(+), 105 deletions(-) diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml index e95aa37080a55..4117736be0c53 100644 --- a/flink-libraries/flink-gelly-examples/pom.xml +++ b/flink-libraries/flink-gelly-examples/pom.xml @@ -94,45 +94,6 @@ - - - net.alchim31.maven - scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - -Xms128m - -Xmx512m - - - - org.scalamacros - paradise_${scala.version} - ${scala.macros.version} - - - - @@ -222,4 +183,108 @@ + + + + scala + + + !scala-2.12 + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + + + + + scala-2.12 + + + !scala-2.10 + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + + + + + + + diff --git a/flink-libraries/flink-gelly-scala/pom.xml b/flink-libraries/flink-gelly-scala/pom.xml index dad7237b5a7fc..f588884c7d22b 100644 --- a/flink-libraries/flink-gelly-scala/pom.xml +++ b/flink-libraries/flink-gelly-scala/pom.xml @@ -103,45 +103,6 @@ under the License. - - - net.alchim31.maven - scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - -Xms128m - -Xmx512m - - - - org.scalamacros - paradise_${scala.version} - ${scala.macros.version} - - - - @@ -237,4 +198,106 @@ under the License. + + + scala + + + !scala-2.12 + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + + + + + scala-2.12 + + + !scala-2.10 + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + + + + + + diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index 27bc548d742f6..e8bc37c6cf67f 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -109,8 +109,8 @@ object Graph { def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, VV, EV] = { - val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet - val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet + val javaTupleVertices = vertices.map((v: (K, VV)) => new jtuple.Tuple2(v._1, v._2)).javaSet + val javaTupleEdges = edges.map((v: (K, K, EV)) => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges, env.getJavaEnv)) } @@ -124,7 +124,7 @@ object Graph { */ def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag] (edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV] = { - val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet + val javaTupleEdges = edges.map((v: (K, K, EV)) => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet wrapGraph(jg.Graph.fromTupleDataSet[K, EV](javaTupleEdges, env.getJavaEnv)) } @@ -139,7 +139,7 @@ object Graph { def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = { - val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet + val javaTupleEdges = edges.map((v: (K, K, EV)) => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, vertexValueInitializer, env.getJavaEnv)) } @@ -152,7 +152,7 @@ object Graph { */ def fromTuple2DataSet[K: TypeInformation : ClassTag](edges: DataSet[(K, K)], env: ExecutionEnvironment): Graph[K, NullValue, NullValue] = { - val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet + val javaTupleEdges = edges.map((v: (K, K)) => new jtuple.Tuple2(v._1, v._2)).javaSet wrapGraph(jg.Graph.fromTuple2DataSet[K](javaTupleEdges, env.getJavaEnv)) } @@ -166,7 +166,7 @@ object Graph { def fromTuple2DataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag] (edges: DataSet[(K, K)], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, NullValue] = { - val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet + val javaTupleEdges = edges.map((v: (K, K)) => new jtuple.Tuple2(v._1, v._2)).javaSet wrapGraph(jg.Graph.fromTuple2DataSet[K, VV](javaTupleEdges, vertexValueInitializer, env.getJavaEnv)) } @@ -259,7 +259,7 @@ object Graph { ignoreCommentsEdges, lenientEdges, includedFieldsEdges) - .map(edge => (edge._1, edge._2, NullValue.getInstance)) + .map((edge: (K, K)) => (edge._1, edge._2, NullValue.getInstance)) .asInstanceOf[DataSet[(K, K, EV)]] } else { env.readCsvFile[(K, K, EV)]( @@ -331,14 +331,16 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @return the vertex DataSet as Tuple2. */ def getVerticesAsTuple2(): DataSet[(K, VV)] = { - wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1)) + wrap(jgraph.getVerticesAsTuple2).map((javatuple: jtuple.Tuple2[K, VV]) + => (javatuple.f0, javatuple.f1)) } /** * @return the edge DataSet as Tuple3. */ def getEdgesAsTuple3(): DataSet[(K, K, EV)] = { - wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2)) + wrap(jgraph.getEdgesAsTuple3).map((javatuple: jtuple.Tuple3[K, K, EV]) + => (javatuple.f0, javatuple.f1, javatuple.f2)) } /** @@ -508,7 +510,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { */ def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV] = { - val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1, scalatuple._2)).javaSet wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, vertexJoinFunction)) } @@ -537,7 +539,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { cleanFun(vertexValue, inputValue) } } - val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1, scalatuple._2)).javaSet wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newVertexJoin)) } @@ -559,8 +561,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { */ def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = { - val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1, - scalatuple._2, scalatuple._3)).javaSet + val javaTupleSet = inputDataSet.map((scalatuple: (K, K, T)) => + new jtuple.Tuple3(scalatuple._1, scalatuple._2, scalatuple._3)).javaSet wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, edgeJoinFunction)) } @@ -588,8 +590,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { cleanFun(edgeValue, inputValue) } } - val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1, - scalatuple._2, scalatuple._3)).javaSet + val javaTupleSet = inputDataSet.map((scalatuple: (K, K, T)) => + new jtuple.Tuple3(scalatuple._1, scalatuple._2, scalatuple._3)).javaSet wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newEdgeJoin)) } @@ -611,7 +613,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { */ def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = { - val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1, scalatuple._2)).javaSet wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, edgeJoinFunction)) } @@ -641,7 +643,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { cleanFun(edgeValue, inputValue) } } - val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1, scalatuple._2)).javaSet wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newEdgeJoin)) } @@ -664,7 +666,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { */ def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = { - val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1, scalatuple._2)).javaSet wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, edgeJoinFunction)) } @@ -694,7 +696,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { cleanFun(edgeValue, inputValue) } } - val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1, + val javaTupleSet = inputDataSet.map((scalatuple: (K, T)) => new jtuple.Tuple2(scalatuple._1, scalatuple._2)).javaSet wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newEdgeJoin)) } @@ -799,7 +801,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @return A DataSet of Tuple2 */ def inDegrees(): DataSet[(K, LongValue)] = { - wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1)) + wrap(jgraph.inDegrees).map((javatuple: jtuple.Tuple2[K, LongValue]) => + (javatuple.f0, javatuple.f1)) } /** @@ -808,7 +811,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @return A DataSet of Tuple2 */ def outDegrees(): DataSet[(K, LongValue)] = { - wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1)) + wrap(jgraph.outDegrees).map((javatuple: jtuple.Tuple2[K, LongValue]) => + (javatuple.f0, javatuple.f1)) } /** @@ -817,7 +821,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @return A DataSet of Tuple2 */ def getDegrees(): DataSet[(K, LongValue)] = { - wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1)) + wrap(jgraph.getDegrees).map((javatuple: jtuple.Tuple2[K, LongValue]) => + (javatuple.f0, javatuple.f1)) } /** @@ -927,7 +932,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { * @return The IDs of the edges as DataSet */ def getEdgeIds(): DataSet[(K, K)] = { - wrap(jgraph.getEdgeIds).map(jtuple => (jtuple.f0, jtuple.f1)) + wrap(jgraph.getEdgeIds).map((javatuple: jtuple.Tuple2[K, K]) => + (javatuple.f0, javatuple.f1)) } /** @@ -1083,8 +1089,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { */ def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: EdgeDirection): DataSet[(K, VV)] = { - wrap(jgraph.reduceOnNeighbors(reduceNeighborsFunction, direction)).map(jtuple => (jtuple - .f0, jtuple.f1)) + wrap(jgraph.reduceOnNeighbors(reduceNeighborsFunction, direction)) + .map((javatuple: jtuple.Tuple2[K, VV]) => (javatuple.f0, javatuple.f1)) } /** @@ -1102,8 +1108,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { */ def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)] = { - wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0, - jtuple.f1)) + wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)) + .map((javatuple: jtuple.Tuple2[K, EV]) => (javatuple.f0, javatuple.f1)) } /** From b9445788a99a5939677603fe6c6f5a05dcfd4bf0 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 22:35:35 +0200 Subject: [PATCH 11/31] extract version breeze --- flink-libraries/flink-ml/pom.xml | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml index bea956e87d48c..6296e86aee273 100644 --- a/flink-libraries/flink-ml/pom.xml +++ b/flink-libraries/flink-ml/pom.xml @@ -33,6 +33,9 @@ jar + + 0.12 + @@ -47,7 +50,7 @@ org.scalanlp breeze_${scala.binary.version} - 0.12 + ${breeze.version} @@ -122,6 +125,13 @@ (IT|Integration)(Test|Suite|Case) + + + scala-2.12 + + 0.13 + + @@ -201,4 +211,5 @@ + From ac89e248065fda2337e37555374f40c4c7788506 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 22:58:48 +0200 Subject: [PATCH 12/31] fix flink ml --- flink-libraries/flink-ml/pom.xml | 9 +-------- .../scala/org/apache/flink/ml/classification/SVM.scala | 4 ++-- .../org/apache/flink/ml/common/FlinkMLTools.scala | 2 +- .../src/main/scala/org/apache/flink/ml/nn/KNN.scala | 6 ++++-- .../flink/ml/outlier/StochasticOutlierSelection.scala | 2 +- .../apache/flink/ml/preprocessing/MinMaxScaler.scala | 2 +- .../flink/ml/preprocessing/PolynomialFeatures.scala | 2 +- .../org/apache/flink/ml/preprocessing/Splitter.scala | 10 +++++----- .../apache/flink/ml/preprocessing/StandardScaler.scala | 4 ++-- .../scala/org/apache/flink/ml/recommendation/ALS.scala | 4 ++-- 10 files changed, 20 insertions(+), 25 deletions(-) diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml index 6296e86aee273..9f9e71047b51d 100644 --- a/flink-libraries/flink-ml/pom.xml +++ b/flink-libraries/flink-ml/pom.xml @@ -34,7 +34,7 @@ jar - 0.12 + 0.13 @@ -125,13 +125,6 @@ (IT|Integration)(Test|Suite|Case) - - - scala-2.12 - - 0.13 - - diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala index 721dd69308219..a4d962c48b9d4 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala @@ -349,7 +349,7 @@ object SVM{ // Count the number of vectors, but keep the value in a DataSet to broadcast it later // TODO: Once efficient count and intermediate result partitions are implemented, use count - val numberVectors = input map { x => 1 } reduce { _ + _ } + val numberVectors: DataSet[Int] = input map { x: LabeledVector => 1 } reduce { _ + _ } // Group the input data into blocks in round robin fashion val blockedInputNumberElements = FlinkMLTools.block( @@ -357,7 +357,7 @@ object SVM{ blocks, Some(ModuloKeyPartitioner)). cross(numberVectors). - map { x => x } + map { (x: (Block[LabeledVector], Int)) => x } val resultingWeights = initialWeights.iterate(iterations) { weights => { diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala index bfc72a4fcb120..104f18edf9e29 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala @@ -376,7 +376,7 @@ object FlinkMLTools { partitionerOption: Option[Partitioner[Int]] = None) : DataSet[Block[T]] = { val blockIDInput = input map { - element => + (element: T) => val blockID = element.hashCode() % numBlocks val blockIDResult = if(blockID < 0){ diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala index 527e6365ff627..be91d5d377763 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala @@ -227,7 +227,8 @@ object KNN { // join input and training set val crossed = crossTuned.mapPartition { - (iter, out: Collector[(FlinkVector, FlinkVector, Long, Double)]) => { + (iter: Iterator[(Block[FlinkVector], Block[(Long, T)])], + out: Collector[(FlinkVector, FlinkVector, Long, Double)]) => { for ((training, testing) <- iter) { // use a quadtree if (4 ^ dim) * Ntest * log(Ntrain) // < Ntest * Ntrain, and distance is Euclidean @@ -252,7 +253,8 @@ object KNN { // group by input vector id and pick k nearest neighbor for each group val result = crossed.groupBy(2).sortGroup(3, Order.ASCENDING).reduceGroup { - (iter, out: Collector[(FlinkVector, Array[FlinkVector])]) => { + (iter: Iterator[(FlinkVector, FlinkVector, Long, Double)], + out: Collector[(FlinkVector, Array[FlinkVector])]) => { if (iter.hasNext) { val head = iter.next() val key = head._2 diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala index 2c04bb05fa4e2..3dfefbd6db243 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala @@ -197,7 +197,7 @@ object StochasticOutlierSelection extends WithParameters { val resultingParameters = instance.parameters ++ transformParameters // Map to the right format - val vectorsWithIndex = input.zipWithUniqueId.map(vector => { + val vectorsWithIndex = input.zipWithUniqueId.map((vector: (Long, T)) => { BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze) }) diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala index 217e2c26fed16..b37748fb09cf3 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala @@ -149,7 +149,7 @@ object MinMaxScaler { : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = { val minMax = dataSet.map { - v => (v.asBreeze, v.asBreeze) + v: T => (v.asBreeze, v.asBreeze) }.reduce { (minMax1, minMax2) => { diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala index f1c788e4fafd4..977428d953126 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala @@ -116,7 +116,7 @@ object PolynomialFeatures{ val degree = resultingParameters(Degree) input.map { - vector => { + vector: T => { calculatePolynomial(degree, vector) } } diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala index 46b14624823b0..f14d2e34353fc 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala @@ -76,7 +76,7 @@ object Splitter { } } - val leftSplitLight = leftSplit.map(o => (o._1, false)) + val leftSplitLight = leftSplit.map((o: (Long, T)) => (o._1, false)) val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, Boolean)](leftSplitLight) .where(0) @@ -87,7 +87,7 @@ object Splitter { } } - Array(leftSplit.map(o => o._2), rightSplit) + Array(leftSplit.map((o: (Long, T)) => o._2), rightSplit) } // -------------------------------------------------------------------------------------------- @@ -117,14 +117,14 @@ object Splitter { eid.reseedRandomGenerator(seed) - val tempDS: DataSet[(Int,T)] = input.map(o => (eid.sample, o)) + val tempDS: DataSet[(Int,T)] = input.map((o: T) => (eid.sample, o)) val splits = fracArray.length val outputArray = new Array[DataSet[T]]( splits ) for (k <- 0 to splits-1){ - outputArray(k) = tempDS.filter(o => o._1 == k) - .map(o => o._2) + outputArray(k) = tempDS.filter((o: (Int, T)) => o._1 == k) + .map((o: (Int, T)) => o._2) } outputArray diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala index 82e8abf4df1b6..fab936f6e9c44 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala @@ -158,7 +158,7 @@ object StandardScaler { fitParameters: ParameterMap, input: DataSet[(T, Double)]) : Unit = { - val vectorDS = input.map(_._1) + val vectorDS = input.map((i: (T, Double)) => i._1) val metrics = extractFeatureMetrics(vectorDS) instance.metricsOption = Some(metrics) @@ -180,7 +180,7 @@ object StandardScaler { private def extractFeatureMetrics[T <: Vector](dataSet: DataSet[T]) : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = { val metrics = dataSet.map{ - v => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size)) + (v: T) => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size)) }.reduce{ (metrics1, metrics2) => { /* We use formula 1.5b of the cited technical report for the combination of partial diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala index 04543813994cc..8016341b361ef 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala @@ -210,7 +210,7 @@ class ALS extends Predictor[ALS] { val predictions = data.join(userFactors, JoinHint.REPARTITION_HASH_SECOND).where(0) .equalTo(0).join(itemFactors, JoinHint.REPARTITION_HASH_SECOND).where("_1._2") .equalTo(0).map { - triple => { + (triple: (((Long, Long), ALS.Factors), ALS.Factors)) => { val (((uID, iID), uFactors), iFactors) = triple val uFactorsVector = uFactors.factors @@ -404,7 +404,7 @@ object ALS { case Some((userFactors, itemFactors)) => { input.join(userFactors, JoinHint.REPARTITION_HASH_SECOND).where(0).equalTo(0) .join(itemFactors, JoinHint.REPARTITION_HASH_SECOND).where("_1._2").equalTo(0).map { - triple => { + (triple: (((Long, Long), ALS.Factors), ALS.Factors)) => { val (((uID, iID), uFactors), iFactors) = triple val uFactorsVector = uFactors.factors From f9918c83736d76ac3a4332ed5bad0ba01d0c5b4c Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Fri, 7 Apr 2017 23:11:50 +0200 Subject: [PATCH 13/31] add iloopcompat for scala-2.12 --- flink-scala-shell/pom.xml | 53 ++++++++++++++----- .../apache/flink/api/scala/ILoopCompat.scala | 37 +++++++++++++ 2 files changed, 77 insertions(+), 13 deletions(-) create mode 100644 flink-scala-shell/src/main/scala-2.12/org/apache/flink/api/scala/ILoopCompat.scala diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml index cf637d4afcbc9..5cb0996031536 100644 --- a/flink-scala-shell/pom.xml +++ b/flink-scala-shell/pom.xml @@ -121,13 +121,6 @@ under the License. -Xms128m -Xmx512m - - - org.scalamacros - paradise_${scala.version} - ${scala.macros.version} - - @@ -212,12 +205,24 @@ under the License. scala-2.10 - - - - !scala-2.11 - - + + + + + net.alchim31.maven + scala-maven-plugin + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + org.scalamacros @@ -232,6 +237,28 @@ under the License. + + scala-2.11 + + + + + net.alchim31.maven + scala-maven-plugin + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + + + diff --git a/flink-scala-shell/src/main/scala-2.12/org/apache/flink/api/scala/ILoopCompat.scala b/flink-scala-shell/src/main/scala-2.12/org/apache/flink/api/scala/ILoopCompat.scala new file mode 100644 index 0000000000000..a3cc7dfa7cfa7 --- /dev/null +++ b/flink-scala-shell/src/main/scala-2.12/org/apache/flink/api/scala/ILoopCompat.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io.BufferedReader + +import _root_.scala.tools.nsc.interpreter._ +import _root_.scala.io.AnsiColor.{MAGENTA, RESET} + +class ILoopCompat( + in0: Option[BufferedReader], + out0: JPrintWriter) + extends ILoop(in0, out0) { + + override def prompt = { + val promptStr = "Scala-Flink> " + s"$MAGENTA$promptStr$RESET" + } + + protected def addThunk(f: => Unit): Unit = f +} From 3e6482a159b5ef4337275072d2379d62799c45bd Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 09:35:32 +0200 Subject: [PATCH 14/31] make it compile with 2.12 --- flink-contrib/flink-streaming-contrib/pom.xml | 63 ++++++++++++-- flink-mesos/pom.xml | 83 ++++++++++++------- .../LaunchableMesosWorker.java | 6 +- .../scheduler/LaunchCoordinatorTest.scala | 10 +-- .../ReconciliationCoordinatorTest.scala | 6 +- .../mesos/scheduler/TaskMonitorTest.scala | 7 +- flink-yarn/pom.xml | 74 ++++++++++++----- pom.xml | 6 ++ 8 files changed, 186 insertions(+), 69 deletions(-) diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml index a218f120de4c5..6523c8a1da6fa 100644 --- a/flink-contrib/flink-streaming-contrib/pom.xml +++ b/flink-contrib/flink-streaming-contrib/pom.xml @@ -100,13 +100,6 @@ under the License. -Xms128m -Xmx512m - - - org.scalamacros - paradise_${scala.version} - ${scala.macros.version} - - @@ -184,4 +177,60 @@ under the License. + + + + scala + + + !scala-2.12 + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + + + + diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml index eea5da7792bba..b2a59a1d053fc 100644 --- a/flink-mesos/pom.xml +++ b/flink-mesos/pom.xml @@ -60,28 +60,6 @@ under the License. ${project.version} - - com.data-artisans - flakka-actor_${scala.binary.version} - - - - com.data-artisans - flakka-remote_${scala.binary.version} - - - - com.google.protobuf - protobuf-java - - - - - - com.data-artisans - flakka-slf4j_${scala.binary.version} - - org.apache.mesos mesos @@ -122,12 +100,6 @@ under the License. test - - com.data-artisans - flakka-testkit_${scala.binary.version} - test - - org.apache.flink flink-runtime_2.10 @@ -287,4 +259,59 @@ under the License. + + + + + scala-2.12 + + + + !scala-2.11 + + + + + com.typesafe.akka + akka-actor_2.12 + + + com.typesafe.akka + akka-slf4j_2.12 + + + com.typesafe.akka + akka-remote_2.12 + + + com.typesafe.akka + akka-testkit_2.12 + + + + + scala + + + com.data-artisans + flakka-actor_${scala.binary.version} + + + + com.data-artisans + flakka-remote_${scala.binary.version} + + + + com.data-artisans + flakka-slf4j_${scala.binary.version} + + + + com.data-artisans + flakka-testkit_${scala.binary.version} + + + + diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java index bfe9be818579e..69262cdc68cc4 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -126,7 +126,7 @@ public int getPorts() { } @Override - public Map getCustomNamedResources() { + public Map getCustomNamedResources() { return Collections.emptyMap(); } @@ -141,12 +141,12 @@ public List getSoftConstraints() { } @Override - public void setAssignedResources(AssignedResources assignedResources) { + public void setAssignedResources(TaskRequest.AssignedResources assignedResources) { this.assignedResources.set(assignedResources); } @Override - public AssignedResources getAssignedResources() { + public TaskRequest.AssignedResources getAssignedResources() { return assignedResources.get(); } diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala index 34c1f66412950..363fbd7be9e23 100644 --- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala +++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala @@ -21,29 +21,29 @@ package org.apache.flink.mesos.scheduler import java.util.{Collections, UUID} import java.util.concurrent.atomic.AtomicReference +import akka.actor.ActorSystem import akka.actor.FSM.StateTimeout import akka.testkit._ import com.netflix.fenzo.TaskRequest.{AssignedResources, NamedResourceSetRequest} import com.netflix.fenzo._ import com.netflix.fenzo.functions.{Action1, Action2} import com.netflix.fenzo.plugins.VMLeaseObject -import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.api.java.tuple.{Tuple2 => FlinkTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.mesos.scheduler.LaunchCoordinator._ import org.apache.flink.mesos.scheduler.messages._ import org.apache.flink.runtime.akka.AkkaUtils import org.apache.mesos.Protos.{SlaveID, TaskInfo} -import org.apache.mesos.{SchedulerDriver, Protos} +import org.apache.mesos.{Protos, SchedulerDriver} import org.junit.runner.RunWith import org.mockito.Mockito.{verify, _} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer -import org.mockito.{Matchers => MM, Mockito} +import org.mockito.{Mockito, Matchers => MM} import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import scala.collection.JavaConverters._ - import org.apache.flink.mesos.Utils.range import org.apache.flink.mesos.Utils.ranges import org.apache.flink.mesos.Utils.scalar @@ -57,7 +57,7 @@ class LaunchCoordinatorTest with BeforeAndAfterAll { lazy val config = new Configuration() - implicit lazy val system = AkkaUtils.createLocalActorSystem(config) + implicit lazy val system: ActorSystem = AkkaUtils.createLocalActorSystem(config) override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala index c22385204ce16..cc3792c7994ef 100644 --- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala +++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/ReconciliationCoordinatorTest.scala @@ -20,14 +20,14 @@ package org.apache.flink.mesos.scheduler import java.util.UUID -import akka.actor.FSM +import akka.actor.{ActorSystem, FSM} import akka.testkit._ import org.apache.flink.configuration.Configuration import org.apache.flink.mesos.Matchers._ import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.mesos.Protos.TaskState._ -import org.apache.mesos.{SchedulerDriver, Protos} +import org.apache.mesos.{Protos, SchedulerDriver} import org.junit.runner.RunWith import org.mockito.Mockito import org.mockito.Mockito._ @@ -45,7 +45,7 @@ class ReconciliationCoordinatorTest import ReconciliationCoordinator._ lazy val config = new Configuration() - implicit lazy val system = AkkaUtils.createLocalActorSystem(config) + implicit lazy val system: ActorSystem = AkkaUtils.createLocalActorSystem(config) override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) diff --git a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala index b4ef93837c1a7..01d10f91e1605 100644 --- a/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala +++ b/flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/TaskMonitorTest.scala @@ -20,14 +20,15 @@ package org.apache.flink.mesos.scheduler import java.util.UUID +import akka.actor.ActorSystem import akka.actor.FSM.StateTimeout import akka.testkit._ import org.apache.flink.configuration.Configuration import org.apache.flink.mesos.TestFSMUtils import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile -import org.apache.flink.mesos.scheduler.messages.{Disconnected, Connected, StatusUpdate} +import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.mesos.{SchedulerDriver, Protos} +import org.apache.mesos.{Protos, SchedulerDriver} import org.apache.mesos.Protos.TaskState._ import org.junit.runner.RunWith import org.mockito.Mockito @@ -46,7 +47,7 @@ class TaskMonitorTest import TaskMonitor._ lazy val config = new Configuration() - implicit lazy val system = AkkaUtils.createLocalActorSystem(config) + implicit lazy val system: ActorSystem = AkkaUtils.createLocalActorSystem(config) override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml index 11d238ace301e..8263c93621555 100644 --- a/flink-yarn/pom.xml +++ b/flink-yarn/pom.xml @@ -67,32 +67,12 @@ under the License. test - - com.data-artisans - flakka-actor_${scala.binary.version} - - - - com.data-artisans - flakka-remote_${scala.binary.version} - - - - com.data-artisans - flakka-camel_${scala.binary.version} - - com.google.guava guava ${guava.version} - - com.data-artisans - flakka-testkit_${scala.binary.version} - test - @@ -242,4 +222,58 @@ under the License. + + + + scala-2.12 + + + + !scala-2.11 + + + + + com.typesafe.akka + akka-actor_2.12 + + + com.typesafe.akka + akka-camel_2.12 + + + com.typesafe.akka + akka-remote_2.12 + + + com.typesafe.akka + akka-testkit_2.12 + + + + + scala + + + com.data-artisans + flakka-actor_${scala.binary.version} + + + + com.data-artisans + flakka-remote_${scala.binary.version} + + + + com.data-artisans + flakka-camel_${scala.binary.version} + + + + com.data-artisans + flakka-testkit_${scala.binary.version} + + + + diff --git a/pom.xml b/pom.xml index 8c0983e00e962..22b639c30e8b4 100644 --- a/pom.xml +++ b/pom.xml @@ -416,6 +416,12 @@ under the License. test + + com.typesafe.akka + akka-camel_2.12 + ${akka.typesafe.version} + + org.scalatest scalatest_${scala.binary.version} From 566c46708a88daa94d61f4fca3c6317a7327b7fb Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 09:36:22 +0200 Subject: [PATCH 15/31] build all with travis --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 088290b85234f..5e1aa4de89f6f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,7 +19,6 @@ matrix: - jdk: "oraclejdk8" env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" - exclude: # Always run test groups A and B together - jdk: "oraclejdk8" env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,jdk8 -Dmaven.javadoc.skip=true" From 64c99b6db30ca161a6d2991e75340d2eb8427f06 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 10:07:58 +0200 Subject: [PATCH 16/31] profiles --- flink-clients/pom.xml | 8 +++++++- flink-mesos/pom.xml | 8 +++++++- flink-runtime-web/pom.xml | 8 +++++++- flink-runtime/pom.xml | 8 +++++++- flink-yarn/pom.xml | 8 +++++++- 5 files changed, 35 insertions(+), 5 deletions(-) diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml index d2ff36d08ced9..f17295fe816a7 100644 --- a/flink-clients/pom.xml +++ b/flink-clients/pom.xml @@ -194,7 +194,7 @@ under the License. - !scala-2.11 + scala-2.12 @@ -206,6 +206,12 @@ under the License. scala + + + + !scala-2.12 + + com.data-artisans diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml index b2a59a1d053fc..35b4f99ce3c57 100644 --- a/flink-mesos/pom.xml +++ b/flink-mesos/pom.xml @@ -267,7 +267,7 @@ under the License. - !scala-2.11 + scala-2.12 @@ -291,6 +291,12 @@ under the License. scala + + + + !scala-2.12 + + com.data-artisans diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml index 5847096442747..f491680a81de7 100644 --- a/flink-runtime-web/pom.xml +++ b/flink-runtime-web/pom.xml @@ -134,7 +134,7 @@ under the License. - !scala-2.11 + scala-2.12 @@ -146,6 +146,12 @@ under the License. scala + + + + !scala-2.12 + + diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 1b79bcf6fe050..2538c4c3d2ffd 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -383,7 +383,7 @@ under the License. - !scala-2.11 + scala-2.12 @@ -407,6 +407,12 @@ under the License. scala + + + + !scala-2.12 + + com.data-artisans diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml index 8263c93621555..082744bfc1aeb 100644 --- a/flink-yarn/pom.xml +++ b/flink-yarn/pom.xml @@ -229,7 +229,7 @@ under the License. - !scala-2.11 + scala-2.12 @@ -253,6 +253,12 @@ under the License. scala + + + + !scala-2.12 + + com.data-artisans From 2f3079d34f18ee853332890e950cf891e44e52c1 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 10:22:04 +0200 Subject: [PATCH 17/31] update pom profile --- .travis.yml | 21 +++++--- flink-scala/pom.xml | 114 ++++++++++++++++---------------------------- 2 files changed, 57 insertions(+), 78 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5e1aa4de89f6f..41593e50031fe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,8 @@ matrix: env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "oraclejdk8" env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" + - jdk: "oraclejdk8" + env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" # Always run test groups A and B together - jdk: "oraclejdk8" @@ -28,11 +30,18 @@ matrix: env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis,jdk8 -Dmaven.javadoc.skip=true" - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" + env: PROFILE="-Dhadoop.version=2.6.3 -Dscala-2.10 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" + env: PROFILE="-Dhadoop.version=2.6.3 -Dscala-2.10 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" + env: PROFILE="-Dhadoop.version=2.6.3 -Dscala-2.10 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" + + - jdk: "openjdk7" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis" + - jdk: "openjdk7" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis" + - jdk: "openjdk7" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis" - jdk: "openjdk7" env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis" @@ -42,11 +51,11 @@ matrix: env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis" - jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-a,include-kinesis" + env: PROFILE="-Dhadoop.version=2.3.0 -Dscala-2.10 -Pflink-fast-tests-a,include-kinesis" - jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-b,include-kinesis" + env: PROFILE="-Dhadoop.version=2.3.0 -Dscala-2.10 -Pflink-fast-tests-b,include-kinesis" - jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-c,include-kinesis" + env: PROFILE="-Dhadoop.version=2.3.0 -Dscala-2.10 -Pflink-fast-tests-c,include-kinesis" git: diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml index 26b688eead386..a62200b43f25f 100644 --- a/flink-scala/pom.xml +++ b/flink-scala/pom.xml @@ -106,6 +106,39 @@ under the License. com.github.siom79.japicmp japicmp-maven-plugin + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + @@ -188,7 +221,7 @@ under the License. scala-2.10 - !scala-2.12 + scala-2.10 @@ -198,6 +231,14 @@ under the License. ${scala.macros.version} + + + scala + + + !scala-2.12 + + @@ -205,32 +246,7 @@ under the License. net.alchim31.maven scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - -Xms128m - -Xmx512m - org.scalamacros @@ -244,52 +260,6 @@ under the License. - - - scala-2.12 - - - !scala-2.10 - - - - - - - - net.alchim31.maven - scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - -Xms128m - -Xmx512m - - - - - - From 4e62b3f0220617cb02459353e7670d7fb1104a2a Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 10:24:24 +0200 Subject: [PATCH 18/31] updated pom profile --- flink-tests/pom.xml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index bfc1c25407ef3..29954526f963a 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -610,7 +610,7 @@ under the License. - !scala-2.11 + scala-2.12 @@ -624,6 +624,12 @@ under the License. scala + + + + !scala-2.12 + + com.data-artisans From 83d5059bf0b4b49a8d3c8abdf98b2e00a68277f0 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 10:28:19 +0200 Subject: [PATCH 19/31] update javassist java.lang.IllegalStateException: Failed to transform class with name scala.concurrent.duration.Duration. Reason: javassist.bytecode.InterfaceMethodrefInfo cannot be cast to javassist.bytecode.MethodrefInfo http://stackoverflow.com/questions/31189086/powermock-and-java-8-issue-interfacemethodrefinfo-cannot-be-cast-to-methodrefin#37217871 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 22b639c30e8b4..5ad1896a445e2 100644 --- a/pom.xml +++ b/pom.xml @@ -316,7 +316,7 @@ under the License. org.javassist javassist - 3.18.2-GA + 3.20.0-GA From b86648d4318b4f2306dfaeae11d0c42bb3e87fbb Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 10:53:51 +0200 Subject: [PATCH 20/31] bump scalatest --- pom.xml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 5ad1896a445e2..836de57dea8aa 100644 --- a/pom.xml +++ b/pom.xml @@ -98,7 +98,7 @@ under the License. 2.10.4 2.10 - 2.2.2 + 3.0.1 3.5.0 1.0.2 0.8.1 @@ -517,8 +517,6 @@ under the License. 2.12 2.4.16 0.9.0 - 3.0.1 - 3.5.0 1.3.0 From b46632e0ce84fd11d7809be01b69772be95c93d4 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 10:54:10 +0200 Subject: [PATCH 21/31] Add types to foreach --- .../flink/api/scala/operators/GroupCombineITCase.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala index 3839ff31a6f35..e2ffdbc3028c8 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala @@ -47,7 +47,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa .output(new DiscardingOutputFormat[Tuple1[String]]) ds.combineGroup((in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) => - in.toSet foreach (out.collect(_))) + in.toSet foreach ((t: Tuple1[String]) => out.collect(t))) .output(new DiscardingOutputFormat[Tuple1[String]]) // all methods on UnsortedGrouping @@ -57,7 +57,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa ds.groupBy(0) .combineGroup((in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) => - in.toSet foreach (out.collect(_))) + in.toSet foreach ((t: Tuple1[String]) => out.collect(t))) .output(new DiscardingOutputFormat[Tuple1[String]]) // all methods on SortedGrouping @@ -67,7 +67,7 @@ class GroupCombineITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa ds.groupBy(0).sortGroup(0, Order.ASCENDING) .combineGroup((in: Iterator[Tuple1[String]], out: Collector[Tuple1[String]]) => - in.toSet foreach (out.collect(_))) + in.toSet foreach ((t: Tuple1[String]) => out.collect(t))) .output(new DiscardingOutputFormat[Tuple1[String]]) env.execute() From 595aa45adb399493d9b44f1a618b95112aad3523 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 10:58:56 +0200 Subject: [PATCH 22/31] make profiles better --- flink-libraries/flink-gelly-scala/pom.xml | 103 +++++--------- flink-streaming-scala/pom.xml | 157 +++++----------------- 2 files changed, 66 insertions(+), 194 deletions(-) diff --git a/flink-libraries/flink-gelly-scala/pom.xml b/flink-libraries/flink-gelly-scala/pom.xml index f588884c7d22b..de664372c2f61 100644 --- a/flink-libraries/flink-gelly-scala/pom.xml +++ b/flink-libraries/flink-gelly-scala/pom.xml @@ -103,7 +103,38 @@ under the License. + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + org.apache.maven.plugins @@ -213,32 +244,7 @@ under the License. net.alchim31.maven scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - -Xms128m - -Xmx512m - org.scalamacros @@ -252,52 +258,5 @@ under the License. - - scala-2.12 - - - !scala-2.10 - - - - - - - - net.alchim31.maven - scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - -Xms128m - -Xmx512m - - - - - - - - diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml index d0ba2d9a72b4a..17e5bb4aa645a 100644 --- a/flink-streaming-scala/pom.xml +++ b/flink-streaming-scala/pom.xml @@ -118,7 +118,40 @@ under the License. + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.apache.maven.plugins @@ -235,7 +268,7 @@ under the License. scala-2.10 - !scala-2.12 + scala-2.10 @@ -245,93 +278,21 @@ under the License. ${scala.macros.version} - - - - - - net.alchim31.maven - scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - -Xms128m - -Xmx512m - - - - org.scalamacros - paradise_${scala.version} - ${scala.macros.version} - - - - - - - - - scala-2.11 + scala !scala-2.12 - net.alchim31.maven scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - -Xms128m - -Xmx512m - org.scalamacros @@ -346,53 +307,5 @@ under the License. - - scala-2.12 - - - !scala-2.10 - - - - - - - - net.alchim31.maven - scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - -Xms128m - -Xmx512m - - - - - - - - - From 954149802d27d9edd22aa4121ec7b257fdda198d Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 11:07:08 +0200 Subject: [PATCH 23/31] update chill to 2.12 version compat --- pom.xml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 836de57dea8aa..089e41c3ab79f 100644 --- a/pom.xml +++ b/pom.xml @@ -101,7 +101,7 @@ under the License. 3.0.1 3.5.0 1.0.2 - 0.8.1 + 0.8.3 5.0.4 3.4.6 2.8.0 @@ -516,7 +516,6 @@ under the License. 2.12.1 2.12 2.4.16 - 0.9.0 1.3.0 From 3a335f8299d2b780245ca137a9bbe8a79cb37fbc Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 11:50:01 +0200 Subject: [PATCH 24/31] use jdk8 profile for travis and scala 2.12 --- .travis.yml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index 41593e50031fe..5f000aaeff293 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,11 +15,11 @@ language: java matrix: include: - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" + env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,jdk8 -Dmaven.javadoc.skip=true" - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" + env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis,jdk8 -Dmaven.javadoc.skip=true" - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" + env: PROFILE="-Dhadoop.version=2.7.2 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis,jdk8 -Dmaven.javadoc.skip=true" # Always run test groups A and B together - jdk: "oraclejdk8" @@ -36,12 +36,12 @@ matrix: - jdk: "oraclejdk8" env: PROFILE="-Dhadoop.version=2.6.3 -Dscala-2.10 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" - - jdk: "openjdk7" - env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis" - - jdk: "openjdk7" - env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis" - - jdk: "openjdk7" - env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis" + - jdk: "oraclejdk8" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis,jdk8" + - jdk: "oraclejdk8" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis,jdk8" + - jdk: "oraclejdk8" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.12 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis,jdk8" - jdk: "openjdk7" env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis" From 7c08869a819d6d97686fe0fa5e88f8fe88144165 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 12:14:28 +0200 Subject: [PATCH 25/31] clean up --- flink-contrib/flink-streaming-contrib/pom.xml | 25 -- flink-libraries/flink-gelly-examples/pom.xml | 105 ++---- flink-libraries/flink-gelly-scala/pom.xml | 312 +++++++++--------- flink-scala-shell/pom.xml | 68 ++-- pom.xml | 2 +- 5 files changed, 219 insertions(+), 293 deletions(-) diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml index 6523c8a1da6fa..16cc7cb13426f 100644 --- a/flink-contrib/flink-streaming-contrib/pom.xml +++ b/flink-contrib/flink-streaming-contrib/pom.xml @@ -193,32 +193,7 @@ under the License. net.alchim31.maven scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - -Xms128m - -Xmx512m - org.scalamacros diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml index 4117736be0c53..bf374490b0c74 100644 --- a/flink-libraries/flink-gelly-examples/pom.xml +++ b/flink-libraries/flink-gelly-examples/pom.xml @@ -94,7 +94,39 @@ + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.apache.maven.plugins @@ -199,32 +231,7 @@ net.alchim31.maven scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - -Xms128m - -Xmx512m - org.scalamacros @@ -236,54 +243,6 @@ - - - - scala-2.12 - - - !scala-2.10 - - - - - - - - net.alchim31.maven - scala-maven-plugin - - - - scala-compile-first - process-resources - - compile - - - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - -Xms128m - -Xmx512m - - - - - - - diff --git a/flink-libraries/flink-gelly-scala/pom.xml b/flink-libraries/flink-gelly-scala/pom.xml index de664372c2f61..1ab6813401bfa 100644 --- a/flink-libraries/flink-gelly-scala/pom.xml +++ b/flink-libraries/flink-gelly-scala/pom.xml @@ -18,91 +18,91 @@ specific language governing permissions and limitations under the License. --> - - org.apache.flink - flink-libraries - 1.3-SNAPSHOT - .. - - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + org.apache.flink + flink-libraries + 1.3-SNAPSHOT + .. + + 4.0.0 - flink-gelly-scala_2.10 - flink-gelly-scala + flink-gelly-scala_2.10 + flink-gelly-scala - jar + jar - - - - - org.apache.flink - flink-scala_2.10 - ${project.version} - provided - + - - org.apache.flink - flink-clients_2.10 - ${project.version} - provided - + + + org.apache.flink + flink-scala_2.10 + ${project.version} + provided + - - org.apache.flink - flink-gelly_2.10 - ${project.version} - + + org.apache.flink + flink-clients_2.10 + ${project.version} + provided + - + + org.apache.flink + flink-gelly_2.10 + ${project.version} + - - org.scala-lang - scala-reflect - provided - + - - org.scala-lang - scala-library - provided - + + org.scala-lang + scala-reflect + provided + - - org.scala-lang - scala-compiler - provided - + + org.scala-lang + scala-library + provided + - - org.ow2.asm - asm - ${asm.version} - provided - - - - - - org.apache.flink - flink-tests_2.10 - ${project.version} - test - test-jar - - - - org.apache.flink - flink-test-utils_2.10 - ${project.version} - test - - + + org.scala-lang + scala-compiler + provided + - - + + org.ow2.asm + asm + ${asm.version} + provided + + + + + + org.apache.flink + flink-tests_2.10 + ${project.version} + test + test-jar + + + + org.apache.flink + flink-test-utils_2.10 + ${project.version} + test + + + + + net.alchim31.maven @@ -135,69 +135,69 @@ under the License. - - - org.apache.maven.plugins - maven-eclipse-plugin - 2.8 - - true - - org.scala-ide.sdt.core.scalanature - org.eclipse.jdt.core.javanature - - - org.scala-ide.sdt.core.scalabuilder - - - org.scala-ide.sdt.launching.SCALA_CONTAINER - org.eclipse.jdt.launching.JRE_CONTAINER - - - org.scala-lang:scala-library - org.scala-lang:scala-compiler - - - **/*.scala - **/*.java - - - + + + org.apache.maven.plugins + maven-eclipse-plugin + 2.8 + + true + + org.scala-ide.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + org.scala-ide.sdt.core.scalabuilder + + + org.scala-ide.sdt.launching.SCALA_CONTAINER + org.eclipse.jdt.launching.JRE_CONTAINER + + + org.scala-lang:scala-library + org.scala-lang:scala-compiler + + + **/*.scala + **/*.java + + + - - - org.codehaus.mojo - build-helper-maven-plugin - 1.7 - - - - add-source - generate-sources - - add-source - - - - src/main/scala - - - - - - add-test-source - generate-test-sources - - add-test-source - - - - src/test/scala - - - - - + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + + add-test-source + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + @@ -208,26 +208,26 @@ under the License. - - - maven-assembly-plugin - - - jar-with-dependencies - - - - - make-assembly - package - - single - - - - - - + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml index 5cb0996031536..f5b288c4eb9ce 100644 --- a/flink-scala-shell/pom.xml +++ b/flink-scala-shell/pom.xml @@ -16,7 +16,7 @@ specific language governing permissions and limitations under the License. --> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 @@ -39,7 +39,7 @@ under the License. com.github.scopt - scopt_${scala.binary.version} + scopt_${scala.binary.version} @@ -205,24 +205,11 @@ under the License. scala-2.10 - - - - - net.alchim31.maven - scala-maven-plugin - - - - org.scalamacros - paradise_${scala.version} - ${scala.macros.version} - - - - - - + + + scala-2.10 + + org.scalamacros @@ -238,25 +225,30 @@ under the License. - scala-2.11 + scala + + + !scala-2.12 + + - - - - net.alchim31.maven - scala-maven-plugin - - - - org.scalamacros - paradise_${scala.version} - ${scala.macros.version} - - - - - - + + + + net.alchim31.maven + scala-maven-plugin + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + diff --git a/pom.xml b/pom.xml index 089e41c3ab79f..b1d10cfdc0003 100644 --- a/pom.xml +++ b/pom.xml @@ -101,7 +101,7 @@ under the License. 3.0.1 3.5.0 1.0.2 - 0.8.3 + 0.7.7 5.0.4 3.4.6 2.8.0 From 5c6b29650d2d1011d93cc5ba5baadd80a726d769 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sat, 8 Apr 2017 13:38:47 +0200 Subject: [PATCH 26/31] move akka version to parent pom iso profile --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b1d10cfdc0003..f78e0ee7611aa 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ under the License. log4j-test.properties 18.0 2.3-custom + 2.4.16 1.7 2.0.1 @@ -515,7 +516,6 @@ under the License. 2.12.1 2.12 - 2.4.16 1.3.0 From 2e480d911339d6914f19ce3abe5df6c0736743ba Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sun, 9 Apr 2017 10:10:07 +0200 Subject: [PATCH 27/31] add flag for kafkamodules --- flink-connectors/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 6889e87a10ef0..98d13bac9ca0b 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -107,6 +107,11 @@ under the License. scala + + + !scala-2.12 + + flink-connector-kafka-0.8 flink-connector-kafka-0.9 From dbe541b997390af0152e3f6053f73b728a0a7447 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sun, 16 Apr 2017 15:10:38 +0200 Subject: [PATCH 28/31] Bump asm version to 5.2 and implement visitMethodInsn correct http://movingfulcrum.tumblr.com/post/80826553604/asm-framework-50-the-missing-migration-guide --- .../scala/org/apache/flink/api/scala/ClosureCleaner.scala | 4 ++-- pom.xml | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala index 53bffffe9f424..df44af8925ca0 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala @@ -262,7 +262,7 @@ class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor } override def visitMethodInsn(op: Int, owner: String, name: String, - desc: String) { + desc: String, itf: Boolean) { // Check for calls a getter method for a variable in an interpreter wrapper object. // This means that the corresponding field will be accessed, so we should save it. if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) { @@ -288,7 +288,7 @@ private[flink] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi sig: String, exceptions: Array[String]): MethodVisitor = { new MethodVisitor(ASM5) { override def visitMethodInsn(op: Int, owner: String, name: String, - desc: String) { + desc: String, itf: Boolean) { val argTypes = Type.getArgumentTypes(desc) if (op == INVOKESPECIAL && name == "" && argTypes.nonEmpty && argTypes(0).toString.startsWith("L") // is it an object? diff --git a/pom.xml b/pom.xml index f78e0ee7611aa..0f2a318ed77ff 100644 --- a/pom.xml +++ b/pom.xml @@ -103,7 +103,7 @@ under the License. 3.5.0 1.0.2 0.7.7 - 5.0.4 + 5.2 3.4.6 2.8.0 2.7.4 @@ -514,6 +514,7 @@ under the License. + 1.8 2.12.1 2.12 1.3.0 From 9973b2af4103e7aceb5de5f96d2e50783a5ee4ba Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sun, 16 Apr 2017 15:11:38 +0200 Subject: [PATCH 29/31] wip closurecleaner for lambdas taken from https://github.com/apache/spark/compare/master...JoshRosen:build-for-2.12 referenced in https://issues.apache.org/jira/browse/SPARK-14540 --- .../flink/api/scala/ClosureCleaner.scala | 39 +++++++++- .../scala/utils/LambdaClosureCleaner.scala | 76 +++++++++++++++++++ 2 files changed, 112 insertions(+), 3 deletions(-) create mode 100644 flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala index df44af8925ca0..bec2f9b4c77e6 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala @@ -21,12 +21,12 @@ import java.io._ import org.apache.flink.annotation.Internal import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala.utils.LambdaClosureCleaner import org.apache.flink.util.InstantiationUtil import org.slf4j.LoggerFactory import scala.collection.mutable.Map import scala.collection.mutable.Set - import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type} import org.objectweb.asm.Opcodes._ @@ -36,7 +36,7 @@ object ClosureCleaner { val LOG = LoggerFactory.getLogger(this.getClass) // Get an ASM class reader for a given class from the JAR that loaded it - private def getClassReader(cls: Class[_]): ClassReader = { + def getClassReader(cls: Class[_]): ClassReader = { // Copy data over, before delegating to ClassReader - else we can run out of open file handles. val className = cls.getName.replaceFirst("^.*\\.", "") + ".class" val resourceStream = cls.getResourceAsStream(className) @@ -50,8 +50,9 @@ object ClosureCleaner { } // Check whether a class represents a Scala closure + // since scala 2.12 anonymous functions are called Lambda, pre 2.12 anonfun. private def isClosure(cls: Class[_]): Boolean = { - cls.getName.contains("$anonfun$") + cls.getName.contains("$anonfun$") //|| cls.getName.contains("$Lambda$") } // Get a list of the classes of the outer objects of a given closure object, obj; @@ -110,10 +111,21 @@ object ClosureCleaner { } def clean(func: AnyRef, checkSerializable: Boolean = true) { + + if (!isClosure(func.getClass)) { + LambdaClosureCleaner.clean(func) + return + } + + if (func == null) { + return + } + // TODO: cache outerClasses / innerClasses / accessedFields val outerClasses = getOuterClasses(func) val innerClasses = getInnerClasses(func) val outerObjects = getOuterObjects(func) + val accessedFields = Map[Class[_], Set[String]]() @@ -247,6 +259,27 @@ class ReturnStatementFinder extends ClassVisitor(ASM5) { } } +private class LambdaReturnStatementFinder(targetMethodName: String) extends ClassVisitor(ASM5) { + override def visitMethod( + access: Int, + name: String, + desc: String, + sig: String, + exceptions: Array[String]): MethodVisitor = { + if (name == targetMethodName) { + new MethodVisitor(ASM5) { + override def visitTypeInsn(op: Int, tp: String) { + if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) { + throw new InvalidProgramException("Return statements aren't allowed in Flink closures") + } + } + } + } else { + new MethodVisitor(ASM5) {} + } + } +} + @Internal private[flink] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM5) { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala new file mode 100644 index 0000000000000..94b7af25903e5 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala @@ -0,0 +1,76 @@ +package org.apache.flink.api.scala.utils + +import java.lang.reflect.Method + +import org.apache.flink.api.scala.{ClosureCleaner, LambdaReturnStatementFinder} +import org.slf4j.LoggerFactory + + object LambdaClosureCleaner { + + def getSparkClassLoader: ClassLoader = getClass.getClassLoader + + def getContextOrSparkClassLoader: ClassLoader = + Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader) + + /** Preferred alternative to Class.forName(className) */ + def classForName(className: String): Class[_] = { + Class.forName(className, true, getContextOrSparkClassLoader) + // scalastyle:on classforname + } + + val LOG = LoggerFactory.getLogger(this.getClass) + + def clean(closure: AnyRef): Unit = { + val writeReplaceMethod: Method = try { + closure.getClass.getDeclaredMethod("writeReplace") + } catch { + case e: java.lang.NoSuchMethodException => + LOG.warn("Expected a Java lambda; got " + closure.getClass.getName) + return + } + + writeReplaceMethod.setAccessible(true) + // Because we still need to support Java 7, we must use reflection here. + val serializedLambda: AnyRef = writeReplaceMethod.invoke(closure) + if (serializedLambda.getClass.getName != "java.lang.invoke.SerializedLambda") { + LOG.warn("Closure's writeReplace() method " + + s"returned ${serializedLambda.getClass.getName}, not SerializedLambda") + return + } + + val serializedLambdaClass = classForName("java.lang.invoke.SerializedLambda") + + val implClassName = serializedLambdaClass + .getDeclaredMethod("getImplClass").invoke(serializedLambda).asInstanceOf[String] + // TODO: we do not want to unconditionally strip this suffix. + val implMethodName = { + serializedLambdaClass + .getDeclaredMethod("getImplMethodName").invoke(serializedLambda).asInstanceOf[String] + .stripSuffix("$adapted") + } + val implMethodSignature = serializedLambdaClass + .getDeclaredMethod("getImplMethodSignature").invoke(serializedLambda).asInstanceOf[String] + val capturedArgCount = serializedLambdaClass + .getDeclaredMethod("getCapturedArgCount").invoke(serializedLambda).asInstanceOf[Int] + val capturedArgs = (0 until capturedArgCount).map { argNum: Int => + serializedLambdaClass + .getDeclaredMethod("getCapturedArg", java.lang.Integer.TYPE) + .invoke(serializedLambda, argNum.asInstanceOf[Object]) + } + assert(capturedArgs.size == capturedArgCount) + val implClass = classForName(implClassName.replaceAllLiterally("/", ".")) + + // Fail fast if we detect return statements in closures. + // TODO: match the impl method based on its type signature as well, not just its name. + ClosureCleaner + .getClassReader(implClass) + .accept(new LambdaReturnStatementFinder(implMethodName), 0) + + // Check serializable TODO: add flag + ClosureCleaner.ensureSerializable(closure) + capturedArgs.foreach(ClosureCleaner.clean(_)) + + // TODO: null fields to render the closure serializable? + } +} + From ec4fb48a95587706eb321bc039e1da5ce6b7d873 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Sun, 16 Apr 2017 15:17:16 +0200 Subject: [PATCH 30/31] rename methods for flink iso spark --- .../org/apache/flink/api/scala/ClosureCleaner.scala | 3 ++- .../flink/api/scala/utils/LambdaClosureCleaner.scala | 10 +++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala index bec2f9b4c77e6..9a5f385663b12 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala @@ -259,7 +259,8 @@ class ReturnStatementFinder extends ClassVisitor(ASM5) { } } -private class LambdaReturnStatementFinder(targetMethodName: String) extends ClassVisitor(ASM5) { +private[flink] +class LambdaReturnStatementFinder(targetMethodName: String) extends ClassVisitor(ASM5) { override def visitMethod( access: Int, name: String, diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala index 94b7af25903e5..6bed87e801306 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala @@ -5,16 +5,16 @@ import java.lang.reflect.Method import org.apache.flink.api.scala.{ClosureCleaner, LambdaReturnStatementFinder} import org.slf4j.LoggerFactory - object LambdaClosureCleaner { +object LambdaClosureCleaner { - def getSparkClassLoader: ClassLoader = getClass.getClassLoader + def getFlinkClassLoader: ClassLoader = getClass.getClassLoader - def getContextOrSparkClassLoader: ClassLoader = - Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader) + def getContextOrFlinkClassLoader: ClassLoader = + Option(Thread.currentThread().getContextClassLoader).getOrElse(getFlinkClassLoader) /** Preferred alternative to Class.forName(className) */ def classForName(className: String): Class[_] = { - Class.forName(className, true, getContextOrSparkClassLoader) + Class.forName(className, true, getContextOrFlinkClassLoader) // scalastyle:on classforname } From 76a7395fcdf07da616c57ce00c90bcc5e4442353 Mon Sep 17 00:00:00 2001 From: Jens Kat Date: Mon, 17 Apr 2017 12:31:08 +0200 Subject: [PATCH 31/31] add license file --- .../api/scala/utils/LambdaClosureCleaner.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala index 6bed87e801306..bcca65e6c0995 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/LambdaClosureCleaner.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flink.api.scala.utils import java.lang.reflect.Method