From 0bfc365bedd0116bb44ff64414e8a2216b5667d7 Mon Sep 17 00:00:00 2001 From: Paavo Date: Tue, 9 Jun 2015 10:59:11 +0900 Subject: [PATCH 1/5] Test case for empty stream. --- .../StreamingLinearRegressionSuite.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 9a379406d5061..f5e2d31056cbd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -166,4 +166,22 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase { val error = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints).toList assert((error.head - error.last) > 2) } + + // Test empty RDDs in a stream + test("handling empty RDDs in a stream") { + val model = new StreamingLinearRegressionWithSGD() + .setInitialWeights(Vectors.dense(0.0, 0.0)) + .setStepSize(0.2) + .setNumIterations(25) + val numBatches = 10 + val nPoints = 100 + val emptyInput = Seq.empty[Seq[LabeledPoint]] + val ssc = setupStreams(emptyInput, + (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + model.predictOnValues(inputDStream.map(x => (x.label, x.features))) + } + ) + val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) + } } From 393e36f00dd456eed3e00cd25c0cfa0951991de6 Mon Sep 17 00:00:00 2001 From: Paavo Date: Tue, 9 Jun 2015 11:19:15 +0900 Subject: [PATCH 2/5] Ignore empty RDDs. --- .../regression/StreamingLinearAlgorithm.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index aee51bf22d8d0..28b6daa73668a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -83,13 +83,15 @@ abstract class StreamingLinearAlgorithm[ throw new IllegalArgumentException("Model must be initialized before starting training.") } data.foreachRDD { (rdd, time) => - model = Some(algorithm.run(rdd, model.get.weights)) - logInfo("Model updated at time %s".format(time.toString)) - val display = model.get.weights.size match { - case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...") - case _ => model.get.weights.toArray.mkString("[", ",", "]") + if (rdd.toLocalIterator.nonEmpty) { + model = Some(algorithm.run(rdd, model.get.weights)) + logInfo("Model updated at time %s".format(time.toString)) + val display = model.get.weights.size match { + case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...") + case _ => model.get.weights.toArray.mkString("[", ",", "]") + } + logInfo("Current model: weights, %s".format (display)) } - logInfo("Current model: weights, %s".format (display)) } } From 54ad89ead9908559a79ae60e2df1a97fd980ea2a Mon Sep 17 00:00:00 2001 From: Paavo Date: Tue, 9 Jun 2015 11:27:39 +0900 Subject: [PATCH 3/5] Test case for empty stream. --- .../StreamingLogisticRegressionSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala index e98b61e13e21f..fd653296c9d97 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala @@ -158,4 +158,21 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase val error = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints).toList assert(error.head > 0.8 & error.last < 0.2) } + + // Test empty RDDs in a stream + test("handling empty RDDs in a stream") { + val model = new StreamingLogisticRegressionWithSGD() + .setInitialWeights(Vectors.dense(-0.1)) + .setStepSize(0.01) + .setNumIterations(10) + val numBatches = 10 + val emptyInput = Seq.empty[Seq[LabeledPoint]] + val ssc = setupStreams(emptyInput, + (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + model.predictOnValues(inputDStream.map(x => (x.label, x.features))) + } + ) + val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) + } } From db234cf0eb7d7b377c78151a4c98cc6ba1320282 Mon Sep 17 00:00:00 2001 From: Paavo Date: Wed, 10 Jun 2015 08:01:44 +0900 Subject: [PATCH 4/5] Use !rdd.isEmpty. --- .../spark/mllib/regression/StreamingLinearAlgorithm.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 28b6daa73668a..8de20df643c5a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -83,7 +83,7 @@ abstract class StreamingLinearAlgorithm[ throw new IllegalArgumentException("Model must be initialized before starting training.") } data.foreachRDD { (rdd, time) => - if (rdd.toLocalIterator.nonEmpty) { + if (!rdd.isEmpty) { model = Some(algorithm.run(rdd, model.get.weights)) logInfo("Model updated at time %s".format(time.toString)) val display = model.get.weights.size match { From ff5cd78e2973ff5ea925451a0d875e85706675dc Mon Sep 17 00:00:00 2001 From: Paavo Date: Wed, 10 Jun 2015 08:07:07 +0900 Subject: [PATCH 5/5] Update strings to use interpolation. --- .../spark/mllib/regression/StreamingLinearAlgorithm.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 8de20df643c5a..141052ba813ee 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -85,12 +85,12 @@ abstract class StreamingLinearAlgorithm[ data.foreachRDD { (rdd, time) => if (!rdd.isEmpty) { model = Some(algorithm.run(rdd, model.get.weights)) - logInfo("Model updated at time %s".format(time.toString)) + logInfo(s"Model updated at time ${time.toString}") val display = model.get.weights.size match { case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...") case _ => model.get.weights.toArray.mkString("[", ",", "]") } - logInfo("Current model: weights, %s".format (display)) + logInfo(s"Current model: weights, ${display}") } } }