From b32d1c8b47b907d7871e805422595f51f9d2896b Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 13 Jul 2015 19:51:59 +0100 Subject: [PATCH 1/3] Add predictProbabilities to Naive Bayes, return class probabilities --- .../mllib/classification/NaiveBayes.scala | 76 +++++++++++++++---- .../classification/NaiveBayesSuite.scala | 53 ++++++++++++- 2 files changed, 111 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index f51ee36d0dfcb..9e379d7d74b2f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -93,26 +93,70 @@ class NaiveBayesModel private[mllib] ( override def predict(testData: Vector): Double = { modelType match { case Multinomial => - val prob = thetaMatrix.multiply(testData) - BLAS.axpy(1.0, piVector, prob) - labels(prob.argmax) + labels(multinomialCalculation(testData).argmax) case Bernoulli => - testData.foreachActive { (index, value) => - if (value != 0.0 && value != 1.0) { - throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.") - } - } - val prob = thetaMinusNegTheta.get.multiply(testData) - BLAS.axpy(1.0, piVector, prob) - BLAS.axpy(1.0, negThetaSum.get, prob) - labels(prob.argmax) - case _ => - // This should never happen. - throw new UnknownError(s"Invalid modelType: $modelType.") + labels(bernoulliCalculation(testData).argmax) + } + } + + /** + * Predict values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return an RDD[Vector] where each entry contains the predicted posterior class probabilities, + * in the same order as class labels + */ + def predictProbabilities(testData: RDD[Vector]): RDD[Vector] = { + val bcModel = testData.context.broadcast(this) + testData.mapPartitions { iter => + val model = bcModel.value + iter.map(model.predictProbabilities) } } + /** + * Predict posterior class probabilities for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return predicted posterior class probabilities from the trained model, + * in the same order as class labels + */ + def predictProbabilities(testData: Vector): Vector = { + modelType match { + case Multinomial => + posteriorProbabilities(multinomialCalculation(testData)) + case Bernoulli => + posteriorProbabilities(bernoulliCalculation(testData)) + } + } + + private def multinomialCalculation(testData: Vector) = { + val prob = thetaMatrix.multiply(testData) + BLAS.axpy(1.0, piVector, prob) + prob + } + + private def bernoulliCalculation(testData: Vector) = { + testData.foreachActive((_, value) => + if (value != 0.0 && value != 1.0) { + throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.") + } + ) + val prob = thetaMinusNegTheta.get.multiply(testData) + BLAS.axpy(1.0, piVector, prob) + BLAS.axpy(1.0, negThetaSum.get, prob) + prob + } + + private def posteriorProbabilities(logProb: DenseVector) = { + val logProbArray = logProb.toArray + val maxLog = logProbArray.max + val scaledProbs = logProbArray.map(lp => math.exp(lp - maxLog)) + val probSum = scaledProbs.sum + new DenseVector(scaledProbs.map(_ / probSum)) + } + override def save(sc: SparkContext, path: String): Unit = { val data = NaiveBayesModel.SaveLoadV2_0.Data(labels, pi, theta, modelType) NaiveBayesModel.SaveLoadV2_0.save(sc, path, data) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index f7fc8730606af..c92d4d833cbc6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -19,13 +19,14 @@ package org.apache.spark.mllib.classification import scala.util.Random -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Vector => BV} import breeze.stats.distributions.{Multinomial => BrzMultinomial} import org.apache.spark.{SparkException, SparkFunSuite} -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} +import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.util.Utils object NaiveBayesSuite { @@ -154,6 +155,28 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext { // Test prediction on Array. validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + + // Test posteriors + validationData.map(_.features).foreach { features => + val predicted = model.predictProbabilities(features).toArray + val expected = expectedMultinomialProbabilities(model, features) + expected.zip(predicted).foreach { case (e, p) => assert(e ~== p relTol 1.0e-10) } + } + } + + /** + * @param model Multinomial Naive Bayes model + * @param testData input to compute posterior probabilities for + * @return posterior class probabilities (in order of labels) for input + */ + private def expectedMultinomialProbabilities(model: NaiveBayesModel, testData: Vector) = { + val piVector = new BDV(model.pi) + // model.labels is row-major; treat it as col-major representation of transpose, and transpose: + val thetaMatrix = new BDM(model.theta(0).length, model.theta.length, model.theta.flatten).t + val logClassProbs: BV[Double] = piVector + (thetaMatrix * testData.toBreeze) + val classProbs = logClassProbs.toArray.map(math.exp) + val classProbsSum = classProbs.sum + classProbs.map(_ / classProbsSum) } test("Naive Bayes Bernoulli") { @@ -182,6 +205,32 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext { // Test prediction on Array. validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + + // Test posteriors + validationData.map(_.features).foreach { features => + val predicted = model.predictProbabilities(features).toArray + val expected = expectedBernoulliProbabilities(model, features) + expected.zip(predicted).foreach { case (e, p) => assert(e ~== p relTol 1.0e-10) } + } + } + + /** + * @param model Bernoulli Naive Bayes model + * @param testData input to compute posterior probabilities for + * @return posterior class probabilities (in order of labels) for input + */ + private def expectedBernoulliProbabilities(model: NaiveBayesModel, testData: Vector) = { + val piVector = new BDV(model.pi) + val thetaMatrix = new BDM(model.theta(0).length, model.theta.length, model.theta.flatten).t + val negThetaMatrix = new BDM(model.theta(0).length, model.theta.length, + model.theta.flatten.map(v => math.log(1.0 - math.exp(v)))).t + val testBreeze = testData.toBreeze + val negTestBreeze = new BDV(Array.fill(testBreeze.size)(1.0)) - testBreeze + val piTheta: BV[Double] = piVector + (thetaMatrix * testBreeze) + val logClassProbs: BV[Double] = piTheta + (negThetaMatrix * negTestBreeze) + val classProbs = logClassProbs.toArray.map(math.exp) + val classProbsSum = classProbs.sum + classProbs.map(_ / classProbsSum) } test("detect negative values") { From 95d91fb1b2cf16758be8214ad53bb300dd5c9a65 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 14 Jul 2015 07:58:23 +0100 Subject: [PATCH 2/3] Check that predicted probabilities sum to 1 --- .../org/apache/spark/mllib/classification/NaiveBayesSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index c92d4d833cbc6..7be3e8deff129 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -159,6 +159,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext { // Test posteriors validationData.map(_.features).foreach { features => val predicted = model.predictProbabilities(features).toArray + assert(predicted.sum ~== 1.0 relTol 1.0e-10) val expected = expectedMultinomialProbabilities(model, features) expected.zip(predicted).foreach { case (e, p) => assert(e ~== p relTol 1.0e-10) } } @@ -209,6 +210,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext { // Test posteriors validationData.map(_.features).foreach { features => val predicted = model.predictProbabilities(features).toArray + assert(predicted.sum ~== 1.0 relTol 1.0e-10) val expected = expectedBernoulliProbabilities(model, features) expected.zip(predicted).foreach { case (e, p) => assert(e ~== p relTol 1.0e-10) } } From 23d5a76859ab033ede588e3ea1da1f4cc03ccb51 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 14 Jul 2015 21:58:52 +0100 Subject: [PATCH 3/3] Fix model.labels -> model.theta --- .../org/apache/spark/mllib/classification/NaiveBayesSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index 7be3e8deff129..cffa1ab700f80 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -172,7 +172,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext { */ private def expectedMultinomialProbabilities(model: NaiveBayesModel, testData: Vector) = { val piVector = new BDV(model.pi) - // model.labels is row-major; treat it as col-major representation of transpose, and transpose: + // model.theta is row-major; treat it as col-major representation of transpose, and transpose: val thetaMatrix = new BDM(model.theta(0).length, model.theta.length, model.theta.flatten).t val logClassProbs: BV[Double] = piVector + (thetaMatrix * testData.toBreeze) val classProbs = logClassProbs.toArray.map(math.exp)