From 70a0fc42e8aad7ca78cd97d1a77170a7d422799d Mon Sep 17 00:00:00 2001 From: MechCoder Date: Mon, 20 Jul 2015 13:38:00 +0530 Subject: [PATCH 1/8] [SPARK-9112] [ML] Implement Stats for LogisticRegression --- .../classification/LogisticRegression.scala | 100 +++++++++++++++++- .../LogisticRegressionSuite.scala | 15 +++ 2 files changed, 113 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 8fc9199fb4602..e28c9a52edaaf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -30,10 +30,12 @@ import org.apache.spark.ml.util.Identifiable import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.BLAS._ import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.storage.StorageLevel /** @@ -252,7 +254,13 @@ class LogisticRegression(override val uid: String) if (handlePersistence) instances.unpersist() - copyValues(new LogisticRegressionModel(uid, weights, intercept)) + val model = copyValues(new LogisticRegressionModel(uid, weights, intercept)) + val logRegSummary = new LogisticRegressionTrainingSummary( + model.transform(dataset), + $(probabilityCol), + $(labelCol), + objectiveHistory) + model.setSummary(logRegSummary) } override def copy(extra: ParamMap): LogisticRegression = defaultCopy(extra) @@ -286,6 +294,40 @@ class LogisticRegressionModel private[ml] ( override val numClasses: Int = 2 + private var trainingSummary: Option[LogisticRegressionTrainingSummary] = None + + /** + * Gets summary (e.g. residuals, mse, r-squared ) of model on training set. An exception is + * thrown if `trainingSummary == None`. + */ + def summary: LogisticRegressionTrainingSummary = trainingSummary match { + case Some(summ) => summ + case None => + throw new SparkException( + "No training summary available for this LinearRegressionModel", + new NullPointerException()) + } + + private[classification] def setSummary(summary: LogisticRegressionTrainingSummary): this.type = { + this.trainingSummary = Some(summary) + this + } + + /** Indicates whether a training summary exists for this model instance. */ + def hasSummary: Boolean = trainingSummary.isDefined + + /** + * Evaluates the model on a testset. + * @param dataset Test dataset to evaluate model on. + */ + // TODO: decide on a good name before exposing to public API + def evaluate(dataset: DataFrame): LogisticRegressionSummary = { + val t = udf { features: Vector => raw2probabilityInPlace(predictRaw(features)) } + val labelsAndScores = dataset. + select(col($(labelCol)), t(col($(featuresCol))).as($(probabilityCol))) + new LogisticRegressionSummary(labelsAndScores, $(probabilityCol), $(labelCol)) + } + /** * Predict label for the given feature vector. * The behavior of this can be adjusted using [[threshold]]. @@ -407,6 +449,60 @@ private[classification] class MultiClassSummarizer extends Serializable { } } +@Experimental +class LogisticRegressionTrainingSummary private[classification] ( + predictions: DataFrame, + probabilityCol: String, + labelCol: String, + val objectiveHistory: Array[Double]) + extends LogisticRegressionSummary(predictions, probabilityCol, labelCol) { + + /** Number of training iterations until termination */ + val totalIterations = objectiveHistory.length + +} + +@Experimental +class LogisticRegressionSummary private[classification] ( + @transient val predictions: DataFrame, + val probabilityCol: String, + val labelCol: String) extends Serializable { + + @transient val metrics = new BinaryClassificationMetrics( + predictions.select(probabilityCol, labelCol).map { + case Row(score: Vector, label: Double) => (score(1), label) + } + ) + + /** + * Returns the receiver operating characteristic (ROC) curve, + * which is an RDD of (false positive rate, true positive rate) + * with (0.0, 0.0) prepended and (1.0, 1.0) appended to it. + */ + val roc: RDD[(Double, Double)] = metrics.roc() + + /** + * Computes the area under the receiver operating characteristic (ROC) curve. + */ + val areaUnderROC: Double = metrics.areaUnderROC() + + /** + * Returns the precision-recall curve, which is an RDD of (recall, precision), + * NOT (precision, recall), with (0.0, 1.0) prepended to it. + */ + val pr: RDD[(Double, Double)] = metrics.pr() + + /** Returns the (threshold, F-Measure) curve with beta = 1.0. */ + val fMeasureByThreshold: RDD[(Double, Double)] = metrics.fMeasureByThreshold() + + /** Returns the (threshold, precision) curve. */ + val precisionByThreshold: RDD[(Double, Double)] = metrics.precisionByThreshold() + + /** Returns the (threshold, recall) curve. */ + val recallByThreshold: RDD[(Double, Double)] = metrics.recallByThreshold() + +} + /** * LogisticAggregator computes the gradient and loss for binary logistic loss function, as used * in binary classification for samples in sparse or dense vector in a online fashion. diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index b7dd44753896a..818b6f717e631 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -701,4 +701,19 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { assert(model1.intercept ~== interceptR relTol 1E-5) assert(model1.weights ~= weightsR absTol 1E-6) } + + test("evaluate on test set") { + + // Evaluate on test set should be same as that of the transformed training data. + val lr = new LogisticRegression() + .setMaxIter(10) + .setRegParam(1.0) + .setThreshold(0.6) + val model = lr.fit(dataset) + val summary = model.summary + + val sameSummary = model.evaluate(dataset) + assert(summary.areaUnderROC === sameSummary.areaUnderROC) + + } } From fbed861eace8d0fe780ef326ee9b9164982a1ac5 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Tue, 21 Jul 2015 16:20:11 +0530 Subject: [PATCH 2/8] DataFrame support for metrics --- .../classification/LogisticRegression.scala | 69 +++++++++++++++---- 1 file changed, 54 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index e28c9a52edaaf..1f153989ce093 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -34,7 +34,7 @@ import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.storage.StorageLevel @@ -297,14 +297,14 @@ class LogisticRegressionModel private[ml] ( private var trainingSummary: Option[LogisticRegressionTrainingSummary] = None /** - * Gets summary (e.g. residuals, mse, r-squared ) of model on training set. An exception is + * Gets summary of model on training set. An exception is * thrown if `trainingSummary == None`. */ def summary: LogisticRegressionTrainingSummary = trainingSummary match { case Some(summ) => summ case None => throw new SparkException( - "No training summary available for this LinearRegressionModel", + "No training summary available for this LogisticRegressionModel", new NullPointerException()) } @@ -468,6 +468,8 @@ class LogisticRegressionSummary private[classification] ( val probabilityCol: String, val labelCol: String) extends Serializable { + /** Returns a BinaryClassificationMetrics object. + */ @transient val metrics = new BinaryClassificationMetrics( predictions.select(probabilityCol, labelCol).map { case Row(score: Vector, label: Double) => (score(1), label) @@ -476,10 +478,17 @@ class LogisticRegressionSummary private[classification] ( /** * Returns the receiver operating characteristic (ROC) curve, - * which is an RDD of (false positive rate, true positive rate) + * which is an Dataframe having two fields (false positive rate, true positive rate) * with (0.0, 0.0) prepended and (1.0, 1.0) appended to it. + * Every possible probability obtained in transforming the dataset are used + * as thresholds used in calculating the FPR and TPR. */ - val roc: RDD[(Double, Double)] = metrics.roc() + @transient val roc: DataFrame = { + val distributedRoc = metrics.roc() + val sqlContext = SQLContext.getOrCreate(distributedRoc.sparkContext) + import sqlContext.implicits._ + distributedRoc.toDF("False Positive Rate", "True Positive Rate") + } /** * Computes the area under the receiver operating characteristic (ROC) curve. @@ -487,20 +496,50 @@ class LogisticRegressionSummary private[classification] ( val areaUnderROC: Double = metrics.areaUnderROC() /** - * Returns the precision-recall curve, which is an RDD of (recall, precision), - * NOT (precision, recall), with (0.0, 1.0) prepended to it. + * Returns the precision-recall curve, which is an Dataframe containing + * two fields (recall, precision) NOT (precision, recall), with (0.0, 1.0) prepended to it. + * Every possible probability obtained in transforming the dataset are used + * as thresholds used in calculating the precision and recall. */ - val pr: RDD[(Double, Double)] = metrics.pr() - - /** Returns the (threshold, F-Measure) curve with beta = 1.0. */ - val fMeasureByThreshold: RDD[(Double, Double)] = metrics.fMeasureByThreshold() + @transient val pr: DataFrame = { + val distributedPr = metrics.pr() + val sqlContext = SQLContext.getOrCreate(distributedPr.sparkContext) + import sqlContext.implicits._ + distributedPr.toDF("recall", "precision") + } - /** Returns the (threshold, precision) curve. */ - val precisionByThreshold: RDD[(Double, Double)] = metrics.precisionByThreshold() + /** Returns a dataframe with two fields (threshold, F-Measure) curve with beta = 1.0. + * Every possible probability obtained in transforming the dataset are used + * as thresholds used in calculating the F-measure. + */ + @transient val fMeasureByThreshold: DataFrame = { + val distributedFMeasure = metrics.fMeasureByThreshold() + val sqlContext = SQLContext.getOrCreate(distributedFMeasure.sparkContext) + import sqlContext.implicits._ + distributedFMeasure.toDF("threshold", "F-Measure") + } - /** Returns the (threshold, recall) curve. */ - val recallByThreshold: RDD[(Double, Double)] = metrics.recallByThreshold() + /** Returns a dataframe with two fields (threshold, precision) curve. + * Every possible probability obtained in transforming the dataset are used + * as thresholds used in calculating the precision. + */ + @transient val precisionByThreshold: DataFrame = { + val distributedPrecision = metrics.precisionByThreshold() + val sqlContext = SQLContext.getOrCreate(distributedPrecision.sparkContext) + import sqlContext.implicits._ + distributedPrecision.toDF("threshold", "precision") + } + /** Returns a dataframe with two fields (threshold, recall) curve. + * Every possible probability obtained in transforming the dataset are used + * as thresholds used in calculating the recall. + */ + @transient val recallByThreshold: DataFrame = { + val distributedRecall = metrics.recallByThreshold() + val sqlContext = SQLContext.getOrCreate(distributedRecall.sparkContext) + import sqlContext.implicits._ + distributedRecall.toDF("threshold", "recall") + } } /** From 80d995424d06e353b907b9fde1c99bf81f9538a9 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Tue, 21 Jul 2015 18:47:25 +0530 Subject: [PATCH 3/8] Added tests --- .../classification/LogisticRegression.scala | 2 +- .../LogisticRegressionSuite.scala | 19 ++++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 1f153989ce093..8e018cc81775c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -487,7 +487,7 @@ class LogisticRegressionSummary private[classification] ( val distributedRoc = metrics.roc() val sqlContext = SQLContext.getOrCreate(distributedRoc.sparkContext) import sqlContext.implicits._ - distributedRoc.toDF("False Positive Rate", "True Positive Rate") + distributedRoc.toDF("FalsePositiveRate", "TruePositiveRate") } /** diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 818b6f717e631..6ab9d324d424d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -703,7 +703,6 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { } test("evaluate on test set") { - // Evaluate on test set should be same as that of the transformed training data. val lr = new LogisticRegression() .setMaxIter(10) @@ -714,6 +713,24 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { val sameSummary = model.evaluate(dataset) assert(summary.areaUnderROC === sameSummary.areaUnderROC) + assert(summary.roc.collect() === sameSummary.roc.collect()) + assert(summary.pr.collect() === sameSummary.pr.collect()) + assert(summary.fMeasureByThreshold.collect() === sameSummary.fMeasureByThreshold.collect()) + assert(summary.recallByThreshold.collect() === sameSummary.recallByThreshold.collect()) + assert(summary.precisionByThreshold.collect() === sameSummary.precisionByThreshold.collect()) + } + + test("statistics on training data") { + val lr = new LogisticRegression() + .setMaxIter(10) + .setRegParam(1.0) + .setThreshold(0.6) + val model = lr.fit(dataset) + assert( + model.summary + .objectiveHistory + .sliding(2) + .forall(x => x(0) >= x(1))) } } From 640376a2c7a6b033f71b954cf44f0428cd6cfb0b Mon Sep 17 00:00:00 2001 From: MechCoder Date: Tue, 28 Jul 2015 15:53:11 +0530 Subject: [PATCH 4/8] remove unnecessary dataframe stuff and add docs --- .../classification/LogisticRegression.scala | 59 +++++++++---------- .../LogisticRegressionSuite.scala | 16 ++--- 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 8e018cc81775c..eeb0a52130dc1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -450,6 +450,15 @@ private[classification] class MultiClassSummarizer extends Serializable { } @Experimental +/** + * :: Experimental :: + * Logistic regression training results. + * @param predictions dataframe outputted by the model's `transform` method. + * @param probabilityCol field in "predictions" which gives the calibrated probability of + * each sample as a vector. + * @param labelCol field in "predictions" which gives the true label of each sample. + * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. + */ class LogisticRegressionTrainingSummary private[classification] ( predictions: DataFrame, probabilityCol: String, @@ -463,14 +472,25 @@ class LogisticRegressionTrainingSummary private[classification] ( } @Experimental +/** + * :: Experimental :: + * Logistic regression results for a given model. + * @param predictions dataframe outputted by the model's `transform` method. + * @param probabilityCol field in "predictions" which gives the calibrated probability of + * each sample. + * @param labelCol field in "predictions" which gives the true label of each sample. + */ class LogisticRegressionSummary private[classification] ( @transient val predictions: DataFrame, val probabilityCol: String, val labelCol: String) extends Serializable { + private val sqlContext = predictions.sqlContext + import sqlContext.implicits._ + /** Returns a BinaryClassificationMetrics object. */ - @transient val metrics = new BinaryClassificationMetrics( + @transient private val metrics = new BinaryClassificationMetrics( predictions.select(probabilityCol, labelCol).map { case Row(score: Vector, label: Double) => (score(1), label) } @@ -483,17 +503,12 @@ class LogisticRegressionSummary private[classification] ( * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the FPR and TPR. */ - @transient val roc: DataFrame = { - val distributedRoc = metrics.roc() - val sqlContext = SQLContext.getOrCreate(distributedRoc.sparkContext) - import sqlContext.implicits._ - distributedRoc.toDF("FalsePositiveRate", "TruePositiveRate") - } + def roc(): DataFrame = metrics.roc().toDF("FalsePositiveRate", "TruePositiveRate") /** * Computes the area under the receiver operating characteristic (ROC) curve. */ - val areaUnderROC: Double = metrics.areaUnderROC() + def areaUnderROC(): Double = metrics.areaUnderROC() /** * Returns the precision-recall curve, which is an Dataframe containing @@ -501,45 +516,29 @@ class LogisticRegressionSummary private[classification] ( * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the precision and recall. */ - @transient val pr: DataFrame = { - val distributedPr = metrics.pr() - val sqlContext = SQLContext.getOrCreate(distributedPr.sparkContext) - import sqlContext.implicits._ - distributedPr.toDF("recall", "precision") - } + def pr(): DataFrame = metrics.pr().toDF("recall", "precision") /** Returns a dataframe with two fields (threshold, F-Measure) curve with beta = 1.0. * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the F-measure. */ - @transient val fMeasureByThreshold: DataFrame = { - val distributedFMeasure = metrics.fMeasureByThreshold() - val sqlContext = SQLContext.getOrCreate(distributedFMeasure.sparkContext) - import sqlContext.implicits._ - distributedFMeasure.toDF("threshold", "F-Measure") + def fMeasureByThreshold(): DataFrame = { + metrics.fMeasureByThreshold().toDF("threshold", "F-Measure") } /** Returns a dataframe with two fields (threshold, precision) curve. * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the precision. */ - @transient val precisionByThreshold: DataFrame = { - val distributedPrecision = metrics.precisionByThreshold() - val sqlContext = SQLContext.getOrCreate(distributedPrecision.sparkContext) - import sqlContext.implicits._ - distributedPrecision.toDF("threshold", "precision") + def precisionByThreshold(): DataFrame = { + metrics.precisionByThreshold().toDF("threshold", "precision") } /** Returns a dataframe with two fields (threshold, recall) curve. * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the recall. */ - @transient val recallByThreshold: DataFrame = { - val distributedRecall = metrics.recallByThreshold() - val sqlContext = SQLContext.getOrCreate(distributedRecall.sparkContext) - import sqlContext.implicits._ - distributedRecall.toDF("threshold", "recall") - } + def recallByThreshold(): DataFrame = metrics.recallByThreshold().toDF("threshold", "recall") } /** diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 6ab9d324d424d..2b660b4f1f538 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -699,7 +699,7 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { val weightsR = Vectors.dense(0.0, 0.0, 0.0, 0.0) assert(model1.intercept ~== interceptR relTol 1E-5) - assert(model1.weights ~= weightsR absTol 1E-6) + assert(model1.weights ~== weightsR absTol 1E-6) } test("evaluate on test set") { @@ -712,12 +712,14 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { val summary = model.summary val sameSummary = model.evaluate(dataset) - assert(summary.areaUnderROC === sameSummary.areaUnderROC) - assert(summary.roc.collect() === sameSummary.roc.collect()) - assert(summary.pr.collect() === sameSummary.pr.collect()) - assert(summary.fMeasureByThreshold.collect() === sameSummary.fMeasureByThreshold.collect()) - assert(summary.recallByThreshold.collect() === sameSummary.recallByThreshold.collect()) - assert(summary.precisionByThreshold.collect() === sameSummary.precisionByThreshold.collect()) + assert(summary.areaUnderROC() === sameSummary.areaUnderROC()) + assert(summary.roc().collect() === sameSummary.roc.collect()) + assert(summary.pr().collect() === sameSummary.pr.collect()) + assert( + summary.fMeasureByThreshold().collect() === sameSummary.fMeasureByThreshold().collect()) + assert(summary.recallByThreshold().collect() === sameSummary.recallByThreshold().collect()) + assert( + summary.precisionByThreshold().collect() === sameSummary.precisionByThreshold().collect()) } test("statistics on training data") { From 40ad8ef682b9971d44ed30411d668c8ce10c4d25 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Sat, 1 Aug 2015 18:47:13 +0530 Subject: [PATCH 5/8] minor --- .../ml/classification/LogisticRegression.scala | 15 ++++++++++----- .../classification/LogisticRegressionSuite.scala | 1 + 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index eeb0a52130dc1..d6fab56608ae7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -321,7 +321,7 @@ class LogisticRegressionModel private[ml] ( * @param dataset Test dataset to evaluate model on. */ // TODO: decide on a good name before exposing to public API - def evaluate(dataset: DataFrame): LogisticRegressionSummary = { + private[classification] def evaluate(dataset: DataFrame): LogisticRegressionSummary = { val t = udf { features: Vector => raw2probabilityInPlace(predictRaw(features)) } val labelsAndScores = dataset. select(col($(labelCol)), t(col($(featuresCol))).as($(probabilityCol))) @@ -490,10 +490,12 @@ class LogisticRegressionSummary private[classification] ( /** Returns a BinaryClassificationMetrics object. */ + // TODO: Allow the user to vary the number of bins using a setBins method in + // BinaryClassificationMetrics. For now the default is set to 100. @transient private val metrics = new BinaryClassificationMetrics( predictions.select(probabilityCol, labelCol).map { case Row(score: Vector, label: Double) => (score(1), label) - } + }, 100 ) /** @@ -518,7 +520,8 @@ class LogisticRegressionSummary private[classification] ( */ def pr(): DataFrame = metrics.pr().toDF("recall", "precision") - /** Returns a dataframe with two fields (threshold, F-Measure) curve with beta = 1.0. + /** + * Returns a dataframe with two fields (threshold, F-Measure) curve with beta = 1.0. * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the F-measure. */ @@ -526,7 +529,8 @@ class LogisticRegressionSummary private[classification] ( metrics.fMeasureByThreshold().toDF("threshold", "F-Measure") } - /** Returns a dataframe with two fields (threshold, precision) curve. + /** + * Returns a dataframe with two fields (threshold, precision) curve. * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the precision. */ @@ -534,7 +538,8 @@ class LogisticRegressionSummary private[classification] ( metrics.precisionByThreshold().toDF("threshold", "precision") } - /** Returns a dataframe with two fields (threshold, recall) curve. + /** + * Returns a dataframe with two fields (threshold, recall) curve. * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the recall. */ diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 2b660b4f1f538..d6264d217ffb3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -723,6 +723,7 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { } test("statistics on training data") { + // Test that loss is monotonically decreasing. val lr = new LogisticRegression() .setMaxIter(10) .setRegParam(1.0) From 958612537d8ab41b72a976a371dcf462b6d733ba Mon Sep 17 00:00:00 2001 From: MechCoder Date: Tue, 4 Aug 2015 01:41:21 +0530 Subject: [PATCH 6/8] Add abstraction to handle Multiclass Metrics --- .../classification/LogisticRegression.scala | 94 +++++++++++++------ 1 file changed, 63 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index d6fab56608ae7..24d533bb55128 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -255,7 +255,7 @@ class LogisticRegression(override val uid: String) if (handlePersistence) instances.unpersist() val model = copyValues(new LogisticRegressionModel(uid, weights, intercept)) - val logRegSummary = new LogisticRegressionTrainingSummary( + val logRegSummary = new BinaryLogisticRegressionTrainingSummary( model.transform(dataset), $(probabilityCol), $(labelCol), @@ -294,13 +294,13 @@ class LogisticRegressionModel private[ml] ( override val numClasses: Int = 2 - private var trainingSummary: Option[LogisticRegressionTrainingSummary] = None + private var trainingSummary: Option[BinaryLogisticRegressionTrainingSummary] = None /** * Gets summary of model on training set. An exception is * thrown if `trainingSummary == None`. */ - def summary: LogisticRegressionTrainingSummary = trainingSummary match { + def summary: BinaryLogisticRegressionTrainingSummary = trainingSummary match { case Some(summ) => summ case None => throw new SparkException( @@ -308,7 +308,8 @@ class LogisticRegressionModel private[ml] ( new NullPointerException()) } - private[classification] def setSummary(summary: LogisticRegressionTrainingSummary): this.type = { + private[classification] def setSummary( + summary: BinaryLogisticRegressionTrainingSummary): this.type = { this.trainingSummary = Some(summary) this } @@ -321,11 +322,11 @@ class LogisticRegressionModel private[ml] ( * @param dataset Test dataset to evaluate model on. */ // TODO: decide on a good name before exposing to public API - private[classification] def evaluate(dataset: DataFrame): LogisticRegressionSummary = { + private[classification] def evaluate(dataset: DataFrame): BinaryLogisticRegressionSummary = { val t = udf { features: Vector => raw2probabilityInPlace(predictRaw(features)) } val labelsAndScores = dataset. select(col($(labelCol)), t(col($(featuresCol))).as($(probabilityCol))) - new LogisticRegressionSummary(labelsAndScores, $(probabilityCol), $(labelCol)) + new BinaryLogisticRegressionSummary(labelsAndScores, $(probabilityCol), $(labelCol)) } /** @@ -449,17 +450,15 @@ private[classification] class MultiClassSummarizer extends Serializable { } } -@Experimental /** - * :: Experimental :: - * Logistic regression training results. + * Abstract for multinomial Logistic regression training results. * @param predictions dataframe outputted by the model's `transform` method. * @param probabilityCol field in "predictions" which gives the calibrated probability of * each sample as a vector. * @param labelCol field in "predictions" which gives the true label of each sample. * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. */ -class LogisticRegressionTrainingSummary private[classification] ( +private [classification] class LogisticRegressionTrainingSummary private[classification] ( predictions: DataFrame, probabilityCol: String, labelCol: String, @@ -468,31 +467,67 @@ class LogisticRegressionTrainingSummary private[classification] ( /** Number of training iterations until termination */ val totalIterations = objectiveHistory.length - } -@Experimental /** * :: Experimental :: - * Logistic regression results for a given model. + * Logistic regression training results. + * @param predictions dataframe outputted by the model's `transform` method. + * @param probabilityCol field in "predictions" which gives the calibrated probability of + * each sample as a vector. + * @param labelCol field in "predictions" which gives the true label of each sample. + * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. + */ +@Experimental +class BinaryLogisticRegressionTrainingSummary private[classification] ( + predictions: DataFrame, + probabilityCol: String, + labelCol: String, + val objectiveHistory: Array[Double]) + extends BinaryLogisticRegressionSummary(predictions, probabilityCol, labelCol) { + + /** Number of training iterations until termination */ + val totalIterations = objectiveHistory.length +} + +/** + * Abstraction for Multiclass logistic regression results for a given model. * @param predictions dataframe outputted by the model's `transform` method. * @param probabilityCol field in "predictions" which gives the calibrated probability of * each sample. * @param labelCol field in "predictions" which gives the true label of each sample. */ -class LogisticRegressionSummary private[classification] ( +private [classification] class LogisticRegressionSummary private [classification] ( @transient val predictions: DataFrame, val probabilityCol: String, val labelCol: String) extends Serializable { +} + +/** + * :: Experimental :: + * Binary Logistic regression results for a given model. + * @param predictions dataframe outputted by the model's `transform` method. + * @param probabilityCol field in "predictions" which gives the calibrated probability of + * each sample. + * @param labelCol field in "predictions" which gives the true label of each sample. + */ +@Experimental +class BinaryLogisticRegressionSummary private[classification] ( + @transient override val predictions: DataFrame, + override val probabilityCol: String, + override val labelCol: String) + extends LogisticRegressionSummary(predictions, probabilityCol, labelCol) { + private val sqlContext = predictions.sqlContext import sqlContext.implicits._ - /** Returns a BinaryClassificationMetrics object. - */ + /** + * Returns a BinaryClassificationMetrics object. + */ // TODO: Allow the user to vary the number of bins using a setBins method in // BinaryClassificationMetrics. For now the default is set to 100. - @transient private val metrics = new BinaryClassificationMetrics( + @transient private val binaryMetrics = new BinaryClassificationMetrics( predictions.select(probabilityCol, labelCol).map { case Row(score: Vector, label: Double) => (score(1), label) }, 100 @@ -500,33 +535,28 @@ class LogisticRegressionSummary private[classification] ( /** * Returns the receiver operating characteristic (ROC) curve, - * which is an Dataframe having two fields (false positive rate, true positive rate) + * which is an Dataframe having two fields (FPR, TPR) * with (0.0, 0.0) prepended and (1.0, 1.0) appended to it. - * Every possible probability obtained in transforming the dataset are used - * as thresholds used in calculating the FPR and TPR. + * @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic */ - def roc(): DataFrame = metrics.roc().toDF("FalsePositiveRate", "TruePositiveRate") + def roc(): DataFrame = binaryMetrics.roc().toDF("FPR", "TPR") /** * Computes the area under the receiver operating characteristic (ROC) curve. */ - def areaUnderROC(): Double = metrics.areaUnderROC() + def areaUnderROC(): Double = binaryMetrics.areaUnderROC() /** * Returns the precision-recall curve, which is an Dataframe containing - * two fields (recall, precision) NOT (precision, recall), with (0.0, 1.0) prepended to it. - * Every possible probability obtained in transforming the dataset are used - * as thresholds used in calculating the precision and recall. + * two fields recall, precision with (0.0, 1.0) prepended to it. */ - def pr(): DataFrame = metrics.pr().toDF("recall", "precision") + def pr(): DataFrame = binaryMetrics.pr().toDF("recall", "precision") /** * Returns a dataframe with two fields (threshold, F-Measure) curve with beta = 1.0. - * Every possible probability obtained in transforming the dataset are used - * as thresholds used in calculating the F-measure. */ def fMeasureByThreshold(): DataFrame = { - metrics.fMeasureByThreshold().toDF("threshold", "F-Measure") + binaryMetrics.fMeasureByThreshold().toDF("threshold", "F-Measure") } /** @@ -535,7 +565,7 @@ class LogisticRegressionSummary private[classification] ( * as thresholds used in calculating the precision. */ def precisionByThreshold(): DataFrame = { - metrics.precisionByThreshold().toDF("threshold", "precision") + binaryMetrics.precisionByThreshold().toDF("threshold", "precision") } /** @@ -543,7 +573,9 @@ class LogisticRegressionSummary private[classification] ( * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the recall. */ - def recallByThreshold(): DataFrame = metrics.recallByThreshold().toDF("threshold", "recall") + def recallByThreshold(): DataFrame = { + binaryMetrics.recallByThreshold().toDF("threshold", "recall") + } } /** From d775371deb987d22f1469a3b10b3d97b850615da Mon Sep 17 00:00:00 2001 From: MechCoder Date: Tue, 4 Aug 2015 12:02:59 +0530 Subject: [PATCH 7/8] Clean up class inheritance --- .../classification/LogisticRegression.scala | 53 +++++++------------ 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 24d533bb55128..e957dae10dfd9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -451,22 +451,23 @@ private[classification] class MultiClassSummarizer extends Serializable { } /** - * Abstract for multinomial Logistic regression training results. - * @param predictions dataframe outputted by the model's `transform` method. - * @param probabilityCol field in "predictions" which gives the calibrated probability of - * each sample as a vector. - * @param labelCol field in "predictions" which gives the true label of each sample. - * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. + * Abstraction for multinomial Logistic Regression Training results. */ -private [classification] class LogisticRegressionTrainingSummary private[classification] ( - predictions: DataFrame, - probabilityCol: String, - labelCol: String, - val objectiveHistory: Array[Double]) - extends LogisticRegressionSummary(predictions, probabilityCol, labelCol) { +sealed trait LogisticRegressionTrainingSummary extends Serializable { + + /** objective function (scaled loss + regularization) at each iteration. */ + val objectiveHistory: Array[Double] /** Number of training iterations until termination */ - val totalIterations = objectiveHistory.length + val totalIterations: Int = objectiveHistory.length + +} + +/** + * Abstraction for Logistic Regression Results for a given model. + */ +sealed trait LogisticRegressionSummary extends Serializable { + } /** @@ -484,23 +485,8 @@ class BinaryLogisticRegressionTrainingSummary private[classification] ( probabilityCol: String, labelCol: String, val objectiveHistory: Array[Double]) - extends BinaryLogisticRegressionSummary(predictions, probabilityCol, labelCol) { - - /** Number of training iterations until termination */ - val totalIterations = objectiveHistory.length -} - -/** - * Abstraction for Multiclass logistic regression results for a given model. - * @param predictions dataframe outputted by the model's `transform` method. - * @param probabilityCol field in "predictions" which gives the calibrated probability of - * each sample. - * @param labelCol field in "predictions" which gives the true label of each sample. - */ -private [classification] class LogisticRegressionSummary private [classification] ( - @transient val predictions: DataFrame, - val probabilityCol: String, - val labelCol: String) extends Serializable { + extends BinaryLogisticRegressionSummary(predictions, probabilityCol, labelCol) + with LogisticRegressionTrainingSummary { } @@ -514,10 +500,9 @@ private [classification] class LogisticRegressionSummary private [classification */ @Experimental class BinaryLogisticRegressionSummary private[classification] ( - @transient override val predictions: DataFrame, - override val probabilityCol: String, - override val labelCol: String) - extends LogisticRegressionSummary(predictions, probabilityCol, labelCol) { + @transient val predictions: DataFrame, + val probabilityCol: String, + val labelCol: String) extends LogisticRegressionSummary { private val sqlContext = predictions.sqlContext import sqlContext.implicits._ From 2e9f7c724a14d7170e0dd910195bde00bb54bd89 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Thu, 6 Aug 2015 14:57:56 +0530 Subject: [PATCH 8/8] Change defs into lazy vals --- .../classification/LogisticRegression.scala | 46 +++++++++++-------- .../JavaLogisticRegressionSuite.java | 9 ++++ .../LogisticRegressionSuite.scala | 16 +++---- 3 files changed, 43 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index e957dae10dfd9..b0de1730ab55b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -294,13 +294,13 @@ class LogisticRegressionModel private[ml] ( override val numClasses: Int = 2 - private var trainingSummary: Option[BinaryLogisticRegressionTrainingSummary] = None + private var trainingSummary: Option[LogisticRegressionTrainingSummary] = None /** * Gets summary of model on training set. An exception is * thrown if `trainingSummary == None`. */ - def summary: BinaryLogisticRegressionTrainingSummary = trainingSummary match { + def summary: LogisticRegressionTrainingSummary = trainingSummary match { case Some(summ) => summ case None => throw new SparkException( @@ -309,7 +309,7 @@ class LogisticRegressionModel private[ml] ( } private[classification] def setSummary( - summary: BinaryLogisticRegressionTrainingSummary): this.type = { + summary: LogisticRegressionTrainingSummary): this.type = { this.trainingSummary = Some(summary) this } @@ -322,11 +322,8 @@ class LogisticRegressionModel private[ml] ( * @param dataset Test dataset to evaluate model on. */ // TODO: decide on a good name before exposing to public API - private[classification] def evaluate(dataset: DataFrame): BinaryLogisticRegressionSummary = { - val t = udf { features: Vector => raw2probabilityInPlace(predictRaw(features)) } - val labelsAndScores = dataset. - select(col($(labelCol)), t(col($(featuresCol))).as($(probabilityCol))) - new BinaryLogisticRegressionSummary(labelsAndScores, $(probabilityCol), $(labelCol)) + private[classification] def evaluate(dataset: DataFrame): LogisticRegressionSummary = { + new BinaryLogisticRegressionSummary(this.transform(dataset), $(probabilityCol), $(labelCol)) } /** @@ -453,13 +450,13 @@ private[classification] class MultiClassSummarizer extends Serializable { /** * Abstraction for multinomial Logistic Regression Training results. */ -sealed trait LogisticRegressionTrainingSummary extends Serializable { +sealed trait LogisticRegressionTrainingSummary extends LogisticRegressionSummary { /** objective function (scaled loss + regularization) at each iteration. */ - val objectiveHistory: Array[Double] + def objectiveHistory: Array[Double] /** Number of training iterations until termination */ - val totalIterations: Int = objectiveHistory.length + def totalIterations: Int = objectiveHistory.length } @@ -468,6 +465,15 @@ sealed trait LogisticRegressionTrainingSummary extends Serializable { */ sealed trait LogisticRegressionSummary extends Serializable { + /** Dataframe outputted by the model's `transform` method. */ + def predictions: DataFrame + + /** Field in "predictions" which gives the calibrated probability of each sample as a vector. */ + def probabilityCol: String + + /** Field in "predictions" which gives the the true label of each sample. */ + def labelCol: String + } /** @@ -500,9 +506,9 @@ class BinaryLogisticRegressionTrainingSummary private[classification] ( */ @Experimental class BinaryLogisticRegressionSummary private[classification] ( - @transient val predictions: DataFrame, - val probabilityCol: String, - val labelCol: String) extends LogisticRegressionSummary { + @transient override val predictions: DataFrame, + override val probabilityCol: String, + override val labelCol: String) extends LogisticRegressionSummary { private val sqlContext = predictions.sqlContext import sqlContext.implicits._ @@ -524,23 +530,23 @@ class BinaryLogisticRegressionSummary private[classification] ( * with (0.0, 0.0) prepended and (1.0, 1.0) appended to it. * @see http://en.wikipedia.org/wiki/Receiver_operating_characteristic */ - def roc(): DataFrame = binaryMetrics.roc().toDF("FPR", "TPR") + @transient lazy val roc: DataFrame = binaryMetrics.roc().toDF("FPR", "TPR") /** * Computes the area under the receiver operating characteristic (ROC) curve. */ - def areaUnderROC(): Double = binaryMetrics.areaUnderROC() + lazy val areaUnderROC: Double = binaryMetrics.areaUnderROC() /** * Returns the precision-recall curve, which is an Dataframe containing * two fields recall, precision with (0.0, 1.0) prepended to it. */ - def pr(): DataFrame = binaryMetrics.pr().toDF("recall", "precision") + @transient lazy val pr: DataFrame = binaryMetrics.pr().toDF("recall", "precision") /** * Returns a dataframe with two fields (threshold, F-Measure) curve with beta = 1.0. */ - def fMeasureByThreshold(): DataFrame = { + @transient lazy val fMeasureByThreshold: DataFrame = { binaryMetrics.fMeasureByThreshold().toDF("threshold", "F-Measure") } @@ -549,7 +555,7 @@ class BinaryLogisticRegressionSummary private[classification] ( * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the precision. */ - def precisionByThreshold(): DataFrame = { + @transient lazy val precisionByThreshold: DataFrame = { binaryMetrics.precisionByThreshold().toDF("threshold", "precision") } @@ -558,7 +564,7 @@ class BinaryLogisticRegressionSummary private[classification] ( * Every possible probability obtained in transforming the dataset are used * as thresholds used in calculating the recall. */ - def recallByThreshold(): DataFrame = { + @transient lazy val recallByThreshold: DataFrame = { binaryMetrics.recallByThreshold().toDF("threshold", "recall") } } diff --git a/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java index f75e024a713ee..d61bad56e1b05 100644 --- a/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java @@ -147,4 +147,13 @@ public void logisticRegressionPredictorClassifierMethods() { } } } + + @Test + public void logisticRegressionTrainingSummary() { + LogisticRegression lr = new LogisticRegression(); + LogisticRegressionModel model = lr.fit(dataset); + + LogisticRegressionTrainingSummary summary = model.summary(); + assert(summary.totalIterations() == summary.objectiveHistory().length); + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index d6264d217ffb3..33cb75637ff02 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -709,17 +709,17 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { .setRegParam(1.0) .setThreshold(0.6) val model = lr.fit(dataset) - val summary = model.summary + val summary = model.summary.asInstanceOf[BinaryLogisticRegressionSummary] - val sameSummary = model.evaluate(dataset) - assert(summary.areaUnderROC() === sameSummary.areaUnderROC()) - assert(summary.roc().collect() === sameSummary.roc.collect()) - assert(summary.pr().collect() === sameSummary.pr.collect()) + val sameSummary = model.evaluate(dataset).asInstanceOf[BinaryLogisticRegressionSummary] + assert(summary.areaUnderROC === sameSummary.areaUnderROC) + assert(summary.roc.collect() === sameSummary.roc.collect()) + assert(summary.pr.collect === sameSummary.pr.collect()) assert( - summary.fMeasureByThreshold().collect() === sameSummary.fMeasureByThreshold().collect()) - assert(summary.recallByThreshold().collect() === sameSummary.recallByThreshold().collect()) + summary.fMeasureByThreshold.collect() === sameSummary.fMeasureByThreshold.collect()) + assert(summary.recallByThreshold.collect() === sameSummary.recallByThreshold.collect()) assert( - summary.precisionByThreshold().collect() === sameSummary.precisionByThreshold().collect()) + summary.precisionByThreshold.collect() === sameSummary.precisionByThreshold.collect()) } test("statistics on training data") {