diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala index bf0c37d7f4..9fa95b0e2c 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Config.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Config.scala @@ -417,6 +417,12 @@ object Config { .setScaldingVersion .setHRavenHistoryUserName + /* + * Extensions to the Default Config to tune it for unit tests + */ + def unitTestDefault: Config = + Config(Config.default.toMap ++ Map("cascading.update.skip" -> "true")) + /** * Merge Config.default with Hadoop config from the mode (if in Hadoop mode) */ diff --git a/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala b/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala new file mode 100644 index 0000000000..6a4d342e40 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/TypedPipeChecker.scala @@ -0,0 +1,32 @@ +package com.twitter.scalding + +/** + * This class is used to assist with testing a TypedPipe + */ +object TypedPipeChecker { + + /* + * Takes a List and a transform function. + * The resulting TypedPipe form the transform will be run through asserts + */ + def checkOutputTransform[T, U, R](input: List[T])(transform: TypedPipe[T] => TypedPipe[U])(assertions: List[U] => R): R = + assertions(inMemoryToList(transform(TypedPipe.from(input)))) + + /* + * Execute a TypedPipe in memory, convert the resulting Iterator to + * a list and run it through a function that makes arbitrary + * assertions on it. + */ + def checkOutput[T, R](output: TypedPipe[T])(assertions: List[T] => R): R = + assertions(inMemoryToList(output)) + + /** + * Execute a TypedPipe in memory and return the result as a List + */ + def inMemoryToList[T](output: TypedPipe[T]): List[T] = + output + .toIterableExecution + .waitFor(Config.unitTestDefault, Local(strictSources = true)) + .get + .toList +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala new file mode 100644 index 0000000000..0329199716 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeCheckerTest.scala @@ -0,0 +1,33 @@ +package com.twitter.scalding + +import org.scalatest.{ Matchers, WordSpec } + +class TypedPipeCheckerTest extends WordSpec with Matchers { + import TypedPipeChecker._ + + "TypedPipeChecker" should { + "run asserts on pipe" in { + checkOutput(TypedPipe.from(List(1, 2, 3, 4))){ rows => + assert(rows.size == 4) + assert(rows == List(1, 2, 3, 4)) + } + } + } + + it should { + "give back a list" in { + val list = inMemoryToList(TypedPipe.from(List(1, 2, 3, 4))) + assert(list == List(1, 2, 3, 4)) + } + } + + it should { + "allow for a list of input to be run through a transform function" in { + def transform(pipe: TypedPipe[Int]) = pipe.map(identity) + + checkOutputTransform(List(1, 2, 3))(transform){ rows => + assert(rows == List(1, 2, 3)) + } + } + } +}