From bc8a3fe24d3f85d57847d0f97decbfd4c9778a02 Mon Sep 17 00:00:00 2001 From: Richard Whitcomb Date: Thu, 21 Jan 2016 11:34:34 -0500 Subject: [PATCH 1/4] Add two functions that assist in testing a TypedPipe --- .../scala/com/twitter/scalding/Config.scala | 6 +++++ .../twitter/scalding/TypedPipeChecker.scala | 24 +++++++++++++++++++ .../scalding/TypedPipeCheckerTest.scala | 23 ++++++++++++++++++ 3 files changed, 53 insertions(+) create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala index bf0c37d7f4..9fa95b0e2c 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala @@ -417,6 +417,12 @@ object Config { .setScaldingVersion .setHRavenHistoryUserName + /* + * Extensions to the Default Config to tune it for unit tests + */ + def unitTestDefault: Config = + Config(Config.default.toMap ++ Map("cascading.update.skip" -> "true")) + /** * Merge Config.default with Hadoop config from the mode (if in Hadoop mode) */ diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala b/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala new file mode 100644 index 0000000000..6bd56bb1fa --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala @@ -0,0 +1,24 @@ +package com.twitter.scalding + +/** + * This class is used to assist with testing a TypedPipe + */ +object TypedPipeChecker { + /* + * Execute a TypedPipe in memory, convert the resulting Iterator to + * a list and run it through a function that makes arbitrary + * assertions on it. + */ + def checkOutput[T](output: TypedPipe[T])(assertions: List[T] => Unit) = + assertions(checkOutputInline(output)) + + /** + * Execute a TypedPipe in memory and return the result as a List + */ + def checkOutputInline[T](output: TypedPipe[T]): List[T] = + output + .toIterableExecution + .waitFor(Config.unitTestDefault, Local(strictSources = true)) + .get + .toList +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala new file mode 100644 index 0000000000..5206f1e7a2 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala @@ -0,0 +1,23 @@ +package com.twitter.scalding + +import org.scalatest.{ Matchers, WordSpec } + +class TypedPipeCheckerTest extends WordSpec with Matchers { + import TypedPipeChecker._ + + "TypedPipeChecker" should { + "run asserts on pipe" in { + checkOutput(TypedPipe.from(List(1, 2, 3, 4))){ rows => + assert(rows.size == 4) + assert(rows == List(1, 2, 3, 4)) + } + } + } + + it should { + "give back a list" in { + val list = checkOutputInline(TypedPipe.from(List(1, 2, 3, 4))) + assert(list == List(1, 2, 3, 4)) + } + } +} From 047e70e7671e96a98dd38ccc0f16e3c36cc52ec6 Mon Sep 17 00:00:00 2001 From: Richard Whitcomb Date: Thu, 21 Jan 2016 11:59:39 -0500 Subject: [PATCH 2/4] Add a function that also allows for a transform for the typed pipe --- .../com/twitter/scalding/TypedPipeChecker.scala | 12 ++++++++++-- .../com/twitter/scalding/TypedPipeCheckerTest.scala | 10 ++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala b/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala index 6bd56bb1fa..6b84872eed 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala @@ -4,6 +4,14 @@ package com.twitter.scalding * This class is used to assist with testing a TypedPipe */ object TypedPipeChecker { + + /* + * Takes a List and a transform function. + * The resulting TypedPipe form the transform will be run through asserts + */ + def checkOutputTransform[T, U](input: List[T])(transform: TypedPipe[T] => TypedPipe[U])(assertions: List[U] => Unit) = + assertions(checkOutputInline(transform(TypedPipe.from(input)))) + /* * Execute a TypedPipe in memory, convert the resulting Iterator to * a list and run it through a function that makes arbitrary @@ -13,8 +21,8 @@ object TypedPipeChecker { assertions(checkOutputInline(output)) /** - * Execute a TypedPipe in memory and return the result as a List - */ + * Execute a TypedPipe in memory and return the result as a List + */ def checkOutputInline[T](output: TypedPipe[T]): List[T] = output .toIterableExecution diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala index 5206f1e7a2..c65742b898 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala @@ -20,4 +20,14 @@ class TypedPipeCheckerTest extends WordSpec with Matchers { assert(list == List(1, 2, 3, 4)) } } + + it should { + "allow for a list of input to be run through a transform function" in { + def transform(pipe: TypedPipe[Int]) = pipe.map(identity) + + checkOutputTransform(List(1, 2, 3))(transform){ rows => + assert(rows == List(1, 2, 3)) + } + } + } } From 74dc583b1e69275acbb2a0cde22af4c1201414e5 Mon Sep 17 00:00:00 2001 From: Richard Whitcomb Date: Thu, 21 Jan 2016 12:48:18 -0500 Subject: [PATCH 3/4] Make return type generic --- .../main/scala/com/twitter/scalding/TypedPipeChecker.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala b/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala index 6b84872eed..17b41cd986 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala @@ -9,7 +9,7 @@ object TypedPipeChecker { * Takes a List and a transform function. * The resulting TypedPipe form the transform will be run through asserts */ - def checkOutputTransform[T, U](input: List[T])(transform: TypedPipe[T] => TypedPipe[U])(assertions: List[U] => Unit) = + def checkOutputTransform[T, U, R](input: List[T])(transform: TypedPipe[T] => TypedPipe[U])(assertions: List[U] => R): R = assertions(checkOutputInline(transform(TypedPipe.from(input)))) /* @@ -17,7 +17,7 @@ object TypedPipeChecker { * a list and run it through a function that makes arbitrary * assertions on it. */ - def checkOutput[T](output: TypedPipe[T])(assertions: List[T] => Unit) = + def checkOutput[T, R](output: TypedPipe[T])(assertions: List[T] => R): R = assertions(checkOutputInline(output)) /** From 9a95370d700a61108265d1fd5683e40958198067 Mon Sep 17 00:00:00 2001 From: Richard Whitcomb Date: Thu, 21 Jan 2016 16:47:51 -0500 Subject: [PATCH 4/4] Rename checkOutputInline --- .../main/scala/com/twitter/scalding/TypedPipeChecker.scala | 6 +++--- .../scala/com/twitter/scalding/TypedPipeCheckerTest.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala b/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala index 17b41cd986..6a4d342e40 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala @@ -10,7 +10,7 @@ object TypedPipeChecker { * The resulting TypedPipe form the transform will be run through asserts */ def checkOutputTransform[T, U, R](input: List[T])(transform: TypedPipe[T] => TypedPipe[U])(assertions: List[U] => R): R = - assertions(checkOutputInline(transform(TypedPipe.from(input)))) + assertions(inMemoryToList(transform(TypedPipe.from(input)))) /* * Execute a TypedPipe in memory, convert the resulting Iterator to @@ -18,12 +18,12 @@ object TypedPipeChecker { * assertions on it. */ def checkOutput[T, R](output: TypedPipe[T])(assertions: List[T] => R): R = - assertions(checkOutputInline(output)) + assertions(inMemoryToList(output)) /** * Execute a TypedPipe in memory and return the result as a List */ - def checkOutputInline[T](output: TypedPipe[T]): List[T] = + def inMemoryToList[T](output: TypedPipe[T]): List[T] = output .toIterableExecution .waitFor(Config.unitTestDefault, Local(strictSources = true)) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala index c65742b898..0329199716 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala @@ -16,7 +16,7 @@ class TypedPipeCheckerTest extends WordSpec with Matchers { it should { "give back a list" in { - val list = checkOutputInline(TypedPipe.from(List(1, 2, 3, 4))) + val list = inMemoryToList(TypedPipe.from(List(1, 2, 3, 4))) assert(list == List(1, 2, 3, 4)) } }