From be6c18b287876befbe9321cbda2e4c94d5deb2e1 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Fri, 10 Jan 2020 19:05:35 +0800 Subject: [PATCH 01/10] init init init fix py fix fix fix fix fix --- .../scala/org/apache/spark/ml/Predictor.scala | 16 ++- .../spark/ml/classification/LinearSVC.scala | 34 +++-- .../apache/spark/ml/feature/Instance.scala | 123 +++++++++++++++++- .../ml/optim/aggregator/HingeAggregator.scala | 69 ++++++++-- .../ml/classification/LinearSVCSuite.scala | 2 +- .../aggregator/HingeAggregatorSuite.scala | 35 +++-- 6 files changed, 241 insertions(+), 38 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala index 98dd692cbe55d..6b628e62fd549 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala @@ -18,8 +18,8 @@ package org.apache.spark.ml import org.apache.spark.annotation.{DeveloperApi, Since} -import org.apache.spark.ml.feature.{Instance, LabeledPoint} -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.feature.{Instance, InstanceBlock, LabeledPoint} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util.SchemaUtils @@ -83,6 +83,18 @@ private[ml] trait PredictorParams extends Params } } + /** + * Extract [[labelCol]], weightCol(if any) and [[featuresCol]] from the given dataset, + * and put it in an RDD with strong types. + */ + protected def extractInstanceBlocks( + dataset: Dataset[_], + blockSize: Int): RDD[InstanceBlock] = { + require(blockSize > 0) + extractInstances(dataset) + .mapPartitions { _.grouped(blockSize).map(InstanceBlock.fromInstances) } + } + /** * Extract [[labelCol]], weightCol(if any) and [[featuresCol]] from the given dataset, * and put it in an RDD with strong types. diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 905789090d625..dba252932c7ec 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator.HingeAggregator import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} @@ -159,16 +159,14 @@ class LinearSVC @Since("2.2.0") ( override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr => - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - - val instances = extractInstances(dataset) - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth) + val sc = dataset.sparkSession.sparkContext + val instances = extractInstances(dataset) + val (summarizer, labelSummarizer) = instances.treeAggregate( (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) => @@ -208,10 +206,10 @@ class LinearSVC @Since("2.2.0") ( throw new SparkException(msg) } - val featuresStd = summarizer.std.toArray + val featuresStd = summarizer.std.compressed + val bcFeaturesStd = sc.broadcast(featuresStd) val getFeaturesStd = (j: Int) => featuresStd(j) val regParamL2 = $(regParam) - val bcFeaturesStd = instances.context.broadcast(featuresStd) val regularization = if (regParamL2 != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures Some(new L2Regularization(regParamL2, shouldApply, @@ -220,8 +218,22 @@ class LinearSVC @Since("2.2.0") ( None } - val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) - val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, + val blocks = instances.mapPartitions { iter => + val featuresStdVec = bcFeaturesStd.value + iter.map { case Instance(label, weight, features) => + val standardized = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = featuresStdVec(i) + if (std != 0) { + standardized(i) = v / std + } + } + Instance(label, weight, Vectors.dense(standardized).compressed) + }.grouped(4096).map(InstanceBlock.fromInstances) + }.persist(StorageLevel.MEMORY_AND_DISK).setName("training dataset (blockSize=4096)") + + val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_) + val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) def regParamL1Fun = (index: Int) => 0D @@ -238,6 +250,7 @@ class LinearSVC @Since("2.2.0") ( scaledObjectiveHistory += state.adjustedValue } + blocks.unpersist() bcFeaturesStd.destroy() if (state == null) { val msg = s"${optimizer.getClass.getName} failed." @@ -268,7 +281,6 @@ class LinearSVC @Since("2.2.0") ( (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result()) } - if (handlePersistence) instances.unpersist() copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector)) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 11d0c4689cbba..9af4bf4077cd9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -17,7 +17,9 @@ package org.apache.spark.ml.feature -import org.apache.spark.ml.linalg.Vector +import scala.collection.mutable + +import org.apache.spark.ml.linalg._ /** * Class that represents an instance of weighted data point with label and features. @@ -28,6 +30,125 @@ import org.apache.spark.ml.linalg.Vector */ private[spark] case class Instance(label: Double, weight: Double, features: Vector) + +/** + * Class that represents an block of instance. + */ +private[spark] class InstanceBlock( + private val labels: Array[Double], + private val weights: Array[Double], + private val featureMatrix: Matrix) { + require(labels.length == featureMatrix.numRows) + require(featureMatrix.isTransposed) + if (weights.nonEmpty) { + require(labels.length == weights.length) + } + + def size: Int = labels.length + + def numFeatures: Int = featureMatrix.numCols + + def instanceIterator: Iterator[Instance] = { + if (weights.nonEmpty) { + labels.iterator.zip(weights.iterator).zip(featureMatrix.rowIter) + .map { case ((label, weight), vec) => Instance(label, weight, vec) } + } else { + labels.iterator.zip(featureMatrix.rowIter) + .map { case (label, vec) => Instance(label, 1.0, vec) } + } + } + + def getLabel(i: Int): Double = labels(i) + + def labelIter: Iterator[Double] = labels.iterator + + def getWeight(i: Int): Double = { + if (weights.nonEmpty) { + weights(i) + } else { + 1.0 + } + } + + def weightIter: Iterator[Double] = { + if (weights.nonEmpty) { + weights.iterator + } else { + Iterator.fill(size)(1.0) + } + } + + // directly get the non-zero iterator of i-th row vector without array copy or slice + val getNonZeroIter: Int => Iterator[(Int, Double)] = { + featureMatrix match { + case dm: DenseMatrix => + (i: Int) => + val start = numFeatures * i + Iterator.tabulate(numFeatures)(j => (j, dm.values(start + j))) + .filter(_._2 != 0) + case sm: SparseMatrix => + (i: Int) => + val start = sm.colPtrs(i) + val end = sm.colPtrs(i + 1) + Iterator.tabulate(end - start)(i => + (sm.rowIndices(start + i), sm.values(start + i)) + ).filter(_._2 != 0) + } + } +} + +private[spark] object InstanceBlock { + + def fromInstances(instances: Seq[Instance]): InstanceBlock = { + val labels = instances.map(_.label).toArray + val weights = if (instances.exists(_.weight != 1)) { + instances.map(_.weight).toArray + } else { + Array.emptyDoubleArray + } + val numRows = instances.length + val numCols = instances.head.features.size + val denseSize = Matrices.getDenseSize(numCols, numRows) + val nnz = instances.iterator.map(_.features.numNonzeros).sum + val sparseSize = Matrices.getSparseSize(nnz, numRows + 1) + val matrix = if (denseSize < sparseSize) { + val values = Array.ofDim[Double](numRows * numCols) + var offset = 0 + var j = 0 + while (j < numRows) { + instances(j).features.foreachNonZero { (i, v) => + values(offset + i) = v + } + offset += numCols + j += 1 + } + new DenseMatrix(numRows, numCols, values, true) + } else { + val colIndices = mutable.ArrayBuilder.make[Int] + val values = mutable.ArrayBuilder.make[Double] + val rowPtrs = mutable.ArrayBuilder.make[Int] + var rowPtr = 0 + rowPtrs += 0 + var j = 0 + while (j < numRows) { + var nnz = 0 + instances(j).features.foreachNonZero { (i, v) => + colIndices += i + values += v + nnz += 1 + } + rowPtr += nnz + rowPtrs += rowPtr + j += 1 + } + new SparseMatrix(numRows, numCols, rowPtrs.result(), + colIndices.result(), values.result(), true) + } + new InstanceBlock(labels, weights, matrix) + } +} + + /** * Case class that represents an instance of data point with * label, weight, offset and features. diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index b0906f1b06511..b1050c99f5664 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg._ /** @@ -32,14 +32,12 @@ import org.apache.spark.ml.linalg._ * * @param bcCoefficients The coefficients corresponding to the features. * @param fitIntercept Whether to fit an intercept term. - * @param bcFeaturesStd The standard deviation values of the features. */ private[ml] class HingeAggregator( - bcFeaturesStd: Broadcast[Array[Double]], + numFeatures: Int, fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[Instance, HingeAggregator] { + extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] { - private val numFeatures: Int = bcFeaturesStd.value.length private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures @transient private lazy val coefficientsArray = bcCoefficients.value match { case DenseVector(values) => values @@ -62,16 +60,13 @@ private[ml] class HingeAggregator( require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this - val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray val localGradientSumArray = gradientSumArray val dotProduct = { var sum = 0.0 features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - sum += localCoefficients(index) * value / localFeaturesStd(index) - } + sum += localCoefficients(index) * value } if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) sum @@ -88,9 +83,7 @@ private[ml] class HingeAggregator( if (1.0 > labelScaled * dotProduct) { val gradientScale = -labelScaled * weight features.foreachNonZero { (index, value) => - if (localFeaturesStd(index) != 0.0) { - localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) - } + localGradientSumArray(index) += value * gradientScale } if (fitIntercept) { localGradientSumArray(localGradientSumArray.length - 1) += gradientScale @@ -102,4 +95,56 @@ private[ml] class HingeAggregator( this } } + + /** + * Add a new training instance to this HingeAggregator, and update the loss and gradient + * of the objective function. + * + * @param instanceBlock The InstanceBlock to be added. + * @return This HingeAggregator object. + */ + def add(instanceBlock: InstanceBlock): this.type = { +// instanceBlock.instanceIterator.foreach(this.add) + require(numFeatures == instanceBlock.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${instanceBlock.numFeatures}.") + require(instanceBlock.weightIter.forall(_ >= 0), + s"instance weights ${instanceBlock.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (instanceBlock.weightIter.forall(_ == 0)) return this + val localCoefficients = coefficientsArray + val localGradientSumArray = gradientSumArray + val intercept = if (fitIntercept) localCoefficients(numFeatures) else 0.0 + + var i = 0 + while (i < instanceBlock.size) { + val weight = instanceBlock.getWeight(i) + if (weight > 0) { + var dot = intercept + instanceBlock.getNonZeroIter(i).foreach { case (index, value) => + dot += localCoefficients(index) * value + } + + // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val label = instanceBlock.getLabel(i) + val labelScaled = 2 * label - 1.0 + val loss = math.max((1.0 - labelScaled * dot) * weight, 0.0) + + if (loss != 0) { + val gradientScale = -labelScaled * weight + instanceBlock.getNonZeroIter(i).foreach { case (index, value) => + localGradientSumArray(index) += value * gradientScale + } + if (fitIntercept) { + localGradientSumArray(numFeatures) += gradientScale + } + lossSum += loss + } + weightSum += weight + } + i += 1 + } + + this + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index c2072cea11859..2b63dc259a14f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -179,7 +179,7 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { test("sparse coefficients in HingeAggregator") { val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients) + val agg = new HingeAggregator(1, true)(bcCoefficients) val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index 61b48ffa10944..33b0eef9617b1 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -32,21 +32,21 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { override def beforeAll(): Unit = { super.beforeAll() - instances = Array( + instances = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), Instance(0.0, 0.3, Vectors.dense(4.0, 0.5)) - ) - instancesConstantFeature = Array( + )) + instancesConstantFeature = standardize(Array( Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), Instance(1.0, 0.3, Vectors.dense(1.0, 0.5)) - ) - instancesConstantFeatureFiltered = Array( + )) + instancesConstantFeatureFiltered = standardize(Array( Instance(0.0, 0.1, Vectors.dense(2.0)), Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) - ) + )) } /** Get summary statistics for some data and create a new HingeAggregator. */ @@ -54,12 +54,25 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { instances: Array[Instance], coefficients: Vector, fitIntercept: Boolean): HingeAggregator = { - val (featuresSummarizer, ySummarizer) = - DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) - val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) - val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients) + new HingeAggregator(instances.head.features.size, fitIntercept)(bcCoefficients) + } + + private def standardize(instances: Array[Instance]): Array[Instance] = { + val (featuresSummarizer, _) = + DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) + val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt) + val numFeatures = stdArray.length + instances.map { case Instance(label, weight, features) => + val standardized = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = stdArray(i) + if (std != 0) { + standardized(i) = v / std + } + } + Instance(label, weight, Vectors.dense(standardized).compressed) + } } test("aggregator add method input size") { From f952e0fe7c6017fa65b599d3d587f8a4cf73937f Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sat, 25 Jan 2020 15:02:27 +0800 Subject: [PATCH 02/10] use BLAS nit py py --- .../spark/serializer/KryoSerializer.scala | 1 + .../scala/org/apache/spark/ml/Predictor.scala | 16 +--- .../spark/ml/classification/LinearSVC.scala | 33 +++++--- .../apache/spark/ml/feature/Instance.scala | 11 ++- .../ml/optim/aggregator/HingeAggregator.scala | 81 ++++++++++++------- .../ml/param/shared/SharedParamsCodeGen.scala | 6 +- .../spark/ml/param/shared/sharedParams.scala | 19 +++++ python/pyspark/ml/classification.py | 23 ++++-- .../ml/param/_shared_params_code_gen.py | 5 +- python/pyspark/ml/param/shared.py | 18 +++++ 10 files changed, 145 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index cdaab599e2a0b..55ac2c410953b 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -502,6 +502,7 @@ private[serializer] object KryoSerializer { "org.apache.spark.ml.attribute.NumericAttribute", "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.InstanceBlock", "org.apache.spark.ml.feature.LabeledPoint", "org.apache.spark.ml.feature.OffsetInstance", "org.apache.spark.ml.linalg.DenseMatrix", diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala index 6b628e62fd549..98dd692cbe55d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala @@ -18,8 +18,8 @@ package org.apache.spark.ml import org.apache.spark.annotation.{DeveloperApi, Since} -import org.apache.spark.ml.feature.{Instance, InstanceBlock, LabeledPoint} -import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.feature.{Instance, LabeledPoint} +import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util.SchemaUtils @@ -83,18 +83,6 @@ private[ml] trait PredictorParams extends Params } } - /** - * Extract [[labelCol]], weightCol(if any) and [[featuresCol]] from the given dataset, - * and put it in an RDD with strong types. - */ - protected def extractInstanceBlocks( - dataset: Dataset[_], - blockSize: Int): RDD[InstanceBlock] = { - require(blockSize > 0) - extractInstances(dataset) - .mapPartitions { _.grouped(blockSize).map(InstanceBlock.fromInstances) } - } - /** * Extract [[labelCol]], weightCol(if any) and [[featuresCol]] from the given dataset, * and put it in an RDD with strong types. diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index dba252932c7ec..6e27bab87448d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasAggregationDepth with HasThreshold { + with HasAggregationDepth with HasThreshold with HasBlockSize { /** * Param for threshold in binary classification prediction. @@ -155,6 +155,15 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) + /** + * Set block size for stacking input data in matrices. + * Default is 4096. + * + * @group expertSetParam + */ + @Since("3.0.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) @@ -162,7 +171,7 @@ class LinearSVC @Since("2.2.0") ( instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, - regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth) + regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize) val sc = dataset.sparkSession.sparkContext val instances = extractInstances(dataset) @@ -218,19 +227,19 @@ class LinearSVC @Since("2.2.0") ( None } - val blocks = instances.mapPartitions { iter => - val featuresStdVec = bcFeaturesStd.value - iter.map { case Instance(label, weight, features) => - val standardized = Array.ofDim[Double](numFeatures) + val standardized = instances.map { + case Instance(label, weight, features) => + val featuresStdVec = bcFeaturesStd.value + val array = Array.ofDim[Double](numFeatures) features.foreachNonZero { (i, v) => val std = featuresStdVec(i) - if (std != 0) { - standardized(i) = v / std - } + if (std != 0) array(i) = v / std } - Instance(label, weight, Vectors.dense(standardized).compressed) - }.grouped(4096).map(InstanceBlock.fromInstances) - }.persist(StorageLevel.MEMORY_AND_DISK).setName("training dataset (blockSize=4096)") + Instance(label, weight, Vectors.dense(array)) + } + val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_) val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 9af4bf4077cd9..20e8cec9890a7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.feature import scala.collection.mutable import org.apache.spark.ml.linalg._ +import org.apache.spark.rdd.RDD /** * Class that represents an instance of weighted data point with label and features. @@ -35,9 +36,9 @@ private[spark] case class Instance(label: Double, weight: Double, features: Vect * Class that represents an block of instance. */ private[spark] class InstanceBlock( - private val labels: Array[Double], - private val weights: Array[Double], - private val featureMatrix: Matrix) { + val labels: Array[Double], + val weights: Array[Double], + val featureMatrix: Matrix) { require(labels.length == featureMatrix.numRows) require(featureMatrix.isTransposed) if (weights.nonEmpty) { @@ -146,6 +147,10 @@ private[spark] object InstanceBlock { } new InstanceBlock(labels, weights, matrix) } + + def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { + instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index b1050c99f5664..cc1472ce9a603 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -39,12 +39,23 @@ private[ml] class HingeAggregator( extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] { private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures + protected override val dim: Int = numFeaturesPlusIntercept @transient private lazy val coefficientsArray = bcCoefficients.value match { case DenseVector(values) => values case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + s" but got type ${bcCoefficients.value.getClass}.") } - protected override val dim: Int = numFeaturesPlusIntercept + @transient private lazy val (linear, intercept) = if (fitIntercept) { + (Vectors.dense(coefficientsArray.take(numFeatures)), coefficientsArray(numFeatures)) + } else { + (Vectors.dense(coefficientsArray), 0.0) + } + @transient private lazy val linearGradSumVec = if (fitIntercept) { + new DenseVector(Array.ofDim[Double](numFeatures)) + } else { + null + } + /** * Add a new training instance to this HingeAggregator, and update the loss and gradient @@ -100,51 +111,59 @@ private[ml] class HingeAggregator( * Add a new training instance to this HingeAggregator, and update the loss and gradient * of the objective function. * - * @param instanceBlock The InstanceBlock to be added. + * @param block The InstanceBlock to be added. * @return This HingeAggregator object. */ - def add(instanceBlock: InstanceBlock): this.type = { -// instanceBlock.instanceIterator.foreach(this.add) - require(numFeatures == instanceBlock.numFeatures, s"Dimensions mismatch when adding new " + - s"instance. Expecting $numFeatures but got ${instanceBlock.numFeatures}.") - require(instanceBlock.weightIter.forall(_ >= 0), - s"instance weights ${instanceBlock.weightIter.mkString("[", ",", "]")} has to be >= 0.0") - - if (instanceBlock.weightIter.forall(_ == 0)) return this - val localCoefficients = coefficientsArray + def add(block: InstanceBlock): this.type = { + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.weightIter.forall(_ >= 0), + s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (block.weightIter.forall(_ == 0)) return this + val size = block.size val localGradientSumArray = gradientSumArray - val intercept = if (fitIntercept) localCoefficients(numFeatures) else 0.0 + val dots = if (intercept != 0) { + new DenseVector(Array.fill(size)(intercept)) + } else { + new DenseVector(Array.ofDim[Double](size)) + } + BLAS.gemv(1.0, block.featureMatrix, linear, 1.0, dots) + + val gradScaleArray = Array.ofDim[Double](size) var i = 0 - while (i < instanceBlock.size) { - val weight = instanceBlock.getWeight(i) + while (i < size) { + val weight = block.getWeight(i) if (weight > 0) { - var dot = intercept - instanceBlock.getNonZeroIter(i).foreach { case (index, value) => - dot += localCoefficients(index) * value - } - + weightSum += weight // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) // Therefore the gradient is -(2y - 1)*x - val label = instanceBlock.getLabel(i) + val label = block.getLabel(i) val labelScaled = 2 * label - 1.0 - val loss = math.max((1.0 - labelScaled * dot) * weight, 0.0) - - if (loss != 0) { - val gradientScale = -labelScaled * weight - instanceBlock.getNonZeroIter(i).foreach { case (index, value) => - localGradientSumArray(index) += value * gradientScale - } - if (fitIntercept) { - localGradientSumArray(numFeatures) += gradientScale - } + val loss = (1.0 - labelScaled * dots(i)) * weight + if (loss > 0) { lossSum += loss + gradScaleArray(i) = -labelScaled * weight } - weightSum += weight } i += 1 } + // predictions are all correct + if (gradScaleArray.forall(_ == 0)) return this + + val gradScaleVec = new DenseVector(gradScaleArray) + + if (fitIntercept) { + BLAS.gemv(1.0, block.featureMatrix.transpose, gradScaleVec, 0.0, linearGradSumVec) + linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } + localGradientSumArray(numFeatures) += gradScaleArray.sum + } else { + val gradSumVec = new DenseVector(localGradientSumArray) + BLAS.gemv(1.0, block.featureMatrix.transpose, gradScaleVec, 1.0, gradSumVec) + } + this } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 7ac680ec1183a..eee75e7f5b722 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -104,7 +104,11 @@ private[shared] object SharedParamsCodeGen { isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"), ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " + "each row is for training or for validation. False indicates training; true indicates " + - "validation.") + "validation."), + ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " + + "stacked within partitions. If block size is more than remaining data in a partition " + + "then it is adjusted to the size of this data.", Some("4096"), + isValid = "ParamValidators.gt(0)", isExpertParam = true) ) val code = genSharedParams(params) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 44c993eeafddc..3d1c55a5eb429 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -578,4 +578,23 @@ trait HasValidationIndicatorCol extends Params { /** @group getParam */ final def getValidationIndicatorCol: String = $(validationIndicatorCol) } + +/** + * Trait for shared param blockSize (default: 4096). This trait may be changed or + * removed between minor versions. + */ +@DeveloperApi +trait HasBlockSize extends Params { + + /** + * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.. + * @group expertParam + */ + final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", ParamValidators.gt(0)) + + setDefault(blockSize, 4096) + + /** @group expertGetParam */ + final def getBlockSize: Int = $(blockSize) +} // scalastyle:on diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 5ab8e606bda03..89d27fbfa316e 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -165,7 +165,8 @@ def predictProbability(self, value): class _LinearSVCParams(_JavaClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol, - HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold): + HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold, + HasBlockSize): """ Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`. @@ -214,6 +215,8 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable LinearSVCModel... >>> model.getThreshold() 0.5 + >>> model.getBlockSize() + 4096 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) >>> model.intercept @@ -252,18 +255,19 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2): + aggregationDepth=2, blockSize=4096): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2): + aggregationDepth=2, blockSize=4096): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LinearSVC", self.uid) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, - standardization=True, threshold=0.0, aggregationDepth=2) + standardization=True, threshold=0.0, aggregationDepth=2, + blockSize=4096) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -272,12 +276,12 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2): + aggregationDepth=2, blockSize=4096): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2): + aggregationDepth=2, blockSize=4096): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -342,6 +346,13 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) + @since("3.0.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class LinearSVCModel(JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable): """ diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index ded3ca84b30f2..3994625c05f1b 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -164,7 +164,10 @@ def get$Name(self): "'euclidean'", "TypeConverters.toString"), ("validationIndicatorCol", "name of the column that indicates whether each row is for " + "training or for validation. False indicates training; true indicates validation.", - None, "TypeConverters.toString")] + None, "TypeConverters.toString"), + ("blockSize", "block size for stacking input data in matrices. Data is stacked within " + "partitions. If block size is more than remaining data in a partition then it is " + "adjusted to the size of this data.", "4096", "TypeConverters.toInt")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 8fc115691f1ab..41ba7b9dc5523 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -580,3 +580,21 @@ def getValidationIndicatorCol(self): Gets the value of validationIndicatorCol or its default value. """ return self.getOrDefault(self.validationIndicatorCol) + + +class HasBlockSize(Params): + """ + Mixin for param blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. + """ + + blockSize = Param(Params._dummy(), "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", typeConverter=TypeConverters.toInt) + + def __init__(self): + super(HasBlockSize, self).__init__() + self._setDefault(blockSize=4096) + + def getBlockSize(self): + """ + Gets the value of blockSize or its default value. + """ + return self.getOrDefault(self.blockSize) From 0ef0946dd128957e0124dbb0e4345828320801b9 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sat, 25 Jan 2020 21:46:47 +0800 Subject: [PATCH 03/10] doc --- .../scala/org/apache/spark/ml/feature/Instance.scala | 10 ++++++---- .../spark/ml/optim/aggregator/HingeAggregator.scala | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 20e8cec9890a7..ec2ba9c0c8aaf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -34,6 +34,7 @@ private[spark] case class Instance(label: Double, weight: Double, features: Vect /** * Class that represents an block of instance. + * If all weights are 1, then an empty array is stored. */ private[spark] class InstanceBlock( val labels: Array[Double], @@ -85,14 +86,15 @@ private[spark] class InstanceBlock( case dm: DenseMatrix => (i: Int) => val start = numFeatures * i - Iterator.tabulate(numFeatures)(j => (j, dm.values(start + j))) - .filter(_._2 != 0) + Iterator.tabulate(numFeatures)(j => + (j, dm.values(start + j)) + ).filter(_._2 != 0) case sm: SparseMatrix => (i: Int) => val start = sm.colPtrs(i) val end = sm.colPtrs(i + 1) - Iterator.tabulate(end - start)(i => - (sm.rowIndices(start + i), sm.values(start + i)) + Iterator.tabulate(end - start)(j => + (sm.rowIndices(start + j), sm.values(start + j)) ).filter(_._2 != 0) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index cc1472ce9a603..b10bfd3884c73 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -108,7 +108,7 @@ private[ml] class HingeAggregator( } /** - * Add a new training instance to this HingeAggregator, and update the loss and gradient + * Add a new training instance block to this HingeAggregator, and update the loss and gradient * of the objective function. * * @param block The InstanceBlock to be added. From bb57a84e5733671ee615ef71f7746563c006d6a3 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sun, 26 Jan 2020 08:35:45 +0800 Subject: [PATCH 04/10] add testsuites and reuse array in HingeAgg --- .../spark/ml/classification/LinearSVC.scala | 1 - .../apache/spark/ml/feature/Instance.scala | 8 ++--- .../ml/optim/aggregator/HingeAggregator.scala | 31 ++++++++++--------- .../spark/ml/feature/InstanceSuite.scala | 31 +++++++++++++++++++ .../aggregator/HingeAggregatorSuite.scala | 23 +++++++++++--- 5 files changed, 71 insertions(+), 23 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 6e27bab87448d..28a4d5ffa1606 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -290,7 +290,6 @@ class LinearSVC @Since("2.2.0") ( (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result()) } - copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector)) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index ec2ba9c0c8aaf..c09a7c490170b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -64,11 +64,11 @@ private[spark] class InstanceBlock( def labelIter: Iterator[Double] = labels.iterator - def getWeight(i: Int): Double = { + @transient lazy val getWeight: Int => Double = { if (weights.nonEmpty) { - weights(i) + (i: Int) => weights(i) } else { - 1.0 + (i: Int) => 1.0 } } @@ -81,7 +81,7 @@ private[spark] class InstanceBlock( } // directly get the non-zero iterator of i-th row vector without array copy or slice - val getNonZeroIter: Int => Iterator[(Int, Double)] = { + @transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = { featureMatrix match { case dm: DenseMatrix => (i: Int) => diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index b10bfd3884c73..31b21ba2a1807 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -124,14 +124,17 @@ private[ml] class HingeAggregator( val size = block.size val localGradientSumArray = gradientSumArray - val dots = if (intercept != 0) { - new DenseVector(Array.fill(size)(intercept)) + val arr = if (intercept != 0) { + Array.fill(size)(intercept) } else { - new DenseVector(Array.ofDim[Double](size)) + Array.ofDim[Double](size) } - BLAS.gemv(1.0, block.featureMatrix, linear, 1.0, dots) - val gradScaleArray = Array.ofDim[Double](size) + // vec here represents dotProducts + val vec = new DenseVector(arr) + BLAS.gemv(1.0, block.featureMatrix, linear, 1.0, vec) + + // in-place convert dotProducts to gradient scales var i = 0 while (i < size) { val weight = block.getWeight(i) @@ -141,27 +144,27 @@ private[ml] class HingeAggregator( // Therefore the gradient is -(2y - 1)*x val label = block.getLabel(i) val labelScaled = 2 * label - 1.0 - val loss = (1.0 - labelScaled * dots(i)) * weight + val loss = (1.0 - labelScaled * arr(i)) * weight if (loss > 0) { lossSum += loss - gradScaleArray(i) = -labelScaled * weight + arr(i) = -labelScaled * weight + } else { + arr(i) = 0.0 } } i += 1 } - // predictions are all correct - if (gradScaleArray.forall(_ == 0)) return this - - val gradScaleVec = new DenseVector(gradScaleArray) + // predictions are all correct, no gradient signal + if (arr.forall(_ == 0)) return this if (fitIntercept) { - BLAS.gemv(1.0, block.featureMatrix.transpose, gradScaleVec, 0.0, linearGradSumVec) + BLAS.gemv(1.0, block.featureMatrix.transpose, vec, 0.0, linearGradSumVec) linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } - localGradientSumArray(numFeatures) += gradScaleArray.sum + localGradientSumArray(numFeatures) += arr.sum } else { val gradSumVec = new DenseVector(localGradientSumArray) - BLAS.gemv(1.0, block.featureMatrix.transpose, gradScaleVec, 1.0, gradSumVec) + BLAS.gemv(1.0, block.featureMatrix.transpose, vec, 1.0, gradSumVec) } this diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index 5a74490058398..afe01ea3fd7a8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -42,5 +42,36 @@ class InstanceSuite extends SparkFunSuite{ val o2 = ser.deserialize[OffsetInstance](ser.serialize(o)) assert(o === o2) } + + val block1 = InstanceBlock.fromInstances(Seq(instance1)) + val block2 = InstanceBlock.fromInstances(Seq(instance1, instance2)) + Seq(block1, block2).foreach { o => + val o2 = ser.deserialize[InstanceBlock](ser.serialize(o)) + assert(o.labels === o2.labels) + assert(o.weights === o2.weights) + assert(o.featureMatrix === o2.featureMatrix) + } + } + + test("InstanceBlock: check correctness") { + val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)) + val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse) + val instances = Seq(instance1, instance2) + + val block = InstanceBlock.fromInstances(instances) + assert(block.size === 2) + assert(block.numFeatures === 2) + block.instanceIterator.zipWithIndex.foreach { + case (instance, i) => + assert(instance.label === instances(i).label) + assert(instance.weight === instances(i).weight) + assert(instance.features.toArray === instances(i).features.toArray) + } + Seq(0, 1).foreach { i => + val nzIter = block.getNonZeroIter(i) + val vec = Vectors.sparse(2, nzIter.toSeq) + assert(vec.toArray === instances(i).features.toArray) + } } + } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index 33b0eef9617b1..c02a0a5e5e7d0 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -67,9 +67,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val standardized = Array.ofDim[Double](numFeatures) features.foreachNonZero { (i, v) => val std = stdArray(i) - if (std != 0) { - standardized(i) = v / std - } + if (std != 0) standardized(i) = v / std } Instance(label, weight, Vectors.dense(standardized).compressed) } @@ -173,4 +171,21 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) } + test("add instance block") { + val coefArray = Array(1.0, 2.0) + val intercept = 1.0 + + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true) + instances.foreach(agg.add) + + val agg2 = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true) + val block = InstanceBlock.fromInstances(instances) + agg2.add(block) + + assert(agg.loss ~== agg2.loss relTol 1e-8) + assert(agg.gradient ~== agg2.gradient relTol 1e-8) + } + } From 1c1ce65e3eebc683168c9bdc67da4c6e00ad7ddb Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sun, 26 Jan 2020 09:39:07 +0800 Subject: [PATCH 05/10] nit --- .../org/apache/spark/ml/classification/LinearSVC.scala | 7 +++---- .../spark/ml/optim/aggregator/HingeAggregator.scala | 8 ++++++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 28a4d5ffa1606..06be509c0ae43 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -217,22 +217,21 @@ class LinearSVC @Since("2.2.0") ( val featuresStd = summarizer.std.compressed val bcFeaturesStd = sc.broadcast(featuresStd) - val getFeaturesStd = (j: Int) => featuresStd(j) val regParamL2 = $(regParam) val regularization = if (regParamL2 != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures Some(new L2Regularization(regParamL2, shouldApply, - if ($(standardization)) None else Some(getFeaturesStd))) + if ($(standardization)) None else Some(featuresStd.apply))) } else { None } val standardized = instances.map { case Instance(label, weight, features) => - val featuresStdVec = bcFeaturesStd.value + val featuresStd = bcFeaturesStd.value val array = Array.ofDim[Double](numFeatures) features.foreachNonZero { (i, v) => - val std = featuresStdVec(i) + val std = featuresStd(i) if (std != 0) array(i) = v / std } Instance(label, weight, Vectors.dense(array)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 31b21ba2a1807..7c92524f3cdec 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -130,11 +130,12 @@ private[ml] class HingeAggregator( Array.ofDim[Double](size) } - // vec here represents dotProducts + // arr/vec here represents dotProducts val vec = new DenseVector(arr) BLAS.gemv(1.0, block.featureMatrix, linear, 1.0, vec) // in-place convert dotProducts to gradient scales + // then, arr/vec represents gradient scales var i = 0 while (i < size) { val weight = block.getWeight(i) @@ -147,10 +148,13 @@ private[ml] class HingeAggregator( val loss = (1.0 - labelScaled * arr(i)) * weight if (loss > 0) { lossSum += loss - arr(i) = -labelScaled * weight + val gradScale = -labelScaled * weight + arr(i) = gradScale } else { arr(i) = 0.0 } + } else { + arr(i) = 0.0 } i += 1 } From 9f8d3b326633c2323dde3836c2913ee8edd46750 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sun, 26 Jan 2020 15:14:50 +0800 Subject: [PATCH 06/10] avoid @transient tuple assignment --- .../src/main/scala/org/apache/spark/ml/linalg/BLAS.scala | 1 - .../spark/ml/optim/aggregator/HingeAggregator.scala | 9 ++++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index e054a15fc9b75..00e5b61dbdc18 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -682,7 +682,6 @@ private[spark] object BLAS extends Serializable { val xTemp = xValues(k) * alpha while (i < indEnd) { - val rowIndex = Arows(i) yValues(Arows(i)) += Avals(i) * xTemp i += 1 } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 7c92524f3cdec..6eb9e1248515b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -45,11 +45,14 @@ private[ml] class HingeAggregator( case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + s" but got type ${bcCoefficients.value.getClass}.") } - @transient private lazy val (linear, intercept) = if (fitIntercept) { - (Vectors.dense(coefficientsArray.take(numFeatures)), coefficientsArray(numFeatures)) + + @transient private lazy val linear = if (fitIntercept) { + Vectors.dense(coefficientsArray.take(numFeatures)) } else { - (Vectors.dense(coefficientsArray), 0.0) + Vectors.dense(coefficientsArray) } + @transient private lazy val intercept = if (fitIntercept) coefficientsArray.last else 0.0 + @transient private lazy val linearGradSumVec = if (fitIntercept) { new DenseVector(Array.ofDim[Double](numFeatures)) } else { From b09a343f07fab4986b63e8286dd752955e4d5d97 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sun, 26 Jan 2020 15:44:50 +0800 Subject: [PATCH 07/10] nit --- .../org/apache/spark/ml/optim/aggregator/HingeAggregator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 6eb9e1248515b..31de72d420be4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -51,7 +51,7 @@ private[ml] class HingeAggregator( } else { Vectors.dense(coefficientsArray) } - @transient private lazy val intercept = if (fitIntercept) coefficientsArray.last else 0.0 + private val intercept = if (fitIntercept) bcCoefficients.value(numFeatures) else 0.0 @transient private lazy val linearGradSumVec = if (fitIntercept) { new DenseVector(Array.ofDim[Double](numFeatures)) From db60d68225150a0f75fb850ec39beb33f6602e08 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sun, 26 Jan 2020 15:47:09 +0800 Subject: [PATCH 08/10] nit --- .../ml/optim/aggregator/HingeAggregator.scala | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 31de72d420be4..565d03fd94d21 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -46,17 +46,23 @@ private[ml] class HingeAggregator( s" but got type ${bcCoefficients.value.getClass}.") } - @transient private lazy val linear = if (fitIntercept) { - Vectors.dense(coefficientsArray.take(numFeatures)) - } else { - Vectors.dense(coefficientsArray) + @transient private lazy val linear = { + if (fitIntercept) { + new DenseVector(coefficientsArray.take(numFeatures)) + } else { + new DenseVector(coefficientsArray) + } } - private val intercept = if (fitIntercept) bcCoefficients.value(numFeatures) else 0.0 - @transient private lazy val linearGradSumVec = if (fitIntercept) { - new DenseVector(Array.ofDim[Double](numFeatures)) - } else { - null + private val intercept = + if (fitIntercept) bcCoefficients.value(numFeatures) else 0.0 + + @transient private lazy val linearGradSumVec = { + if (fitIntercept) { + new DenseVector(Array.ofDim[Double](numFeatures)) + } else { + null + } } From cb91306d6e14c3c882f9f10687bb8e843666d22d Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 27 Jan 2020 22:19:33 +0800 Subject: [PATCH 09/10] nit, reuse a vec among blocks --- .../spark/ml/classification/LinearSVC.scala | 2 +- .../ml/optim/aggregator/HingeAggregator.scala | 45 ++++++++++++------- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 06be509c0ae43..6b1cdd8ad3963 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -240,7 +240,7 @@ class LinearSVC @Since("2.2.0") ( .persist(StorageLevel.MEMORY_AND_DISK) .setName(s"training dataset (blockSize=${$(blockSize)})") - val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_) + val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept), $(blockSize))(_) val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 565d03fd94d21..5b7b3de9bd23b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -35,7 +35,8 @@ import org.apache.spark.ml.linalg._ */ private[ml] class HingeAggregator( numFeatures: Int, - fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) + fitIntercept: Boolean, + blockSize: Int = 4096)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] { private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures @@ -54,8 +55,8 @@ private[ml] class HingeAggregator( } } - private val intercept = - if (fitIntercept) bcCoefficients.value(numFeatures) else 0.0 + @transient private lazy val intercept = + if (fitIntercept) coefficientsArray(numFeatures) else 0.0 @transient private lazy val linearGradSumVec = { if (fitIntercept) { @@ -65,6 +66,9 @@ private[ml] class HingeAggregator( } } + @transient private lazy val auxiliaryVec = + new DenseVector(Array.ofDim[Double](blockSize)) + /** * Add a new training instance to this HingeAggregator, and update the loss and gradient @@ -133,18 +137,27 @@ private[ml] class HingeAggregator( val size = block.size val localGradientSumArray = gradientSumArray - val arr = if (intercept != 0) { - Array.fill(size)(intercept) + // vec here represents dotProducts + val vec = if (size == blockSize) { + auxiliaryVec } else { - Array.ofDim[Double](size) + // the last block within one partition may be of size less than blockSize + new DenseVector(Array.ofDim[Double](size)) } - // arr/vec here represents dotProducts - val vec = new DenseVector(arr) - BLAS.gemv(1.0, block.featureMatrix, linear, 1.0, vec) + if (fitIntercept) { + var i = 0 + while (i < size) { + vec.values(i) = intercept + i += 1 + } + BLAS.gemv(1.0, block.featureMatrix, linear, 1.0, vec) + } else { + BLAS.gemv(1.0, block.featureMatrix, linear, 0.0, vec) + } // in-place convert dotProducts to gradient scales - // then, arr/vec represents gradient scales + // then, vec represents gradient scales var i = 0 while (i < size) { val weight = block.getWeight(i) @@ -154,27 +167,27 @@ private[ml] class HingeAggregator( // Therefore the gradient is -(2y - 1)*x val label = block.getLabel(i) val labelScaled = 2 * label - 1.0 - val loss = (1.0 - labelScaled * arr(i)) * weight + val loss = (1.0 - labelScaled * vec(i)) * weight if (loss > 0) { lossSum += loss val gradScale = -labelScaled * weight - arr(i) = gradScale + vec.values(i) = gradScale } else { - arr(i) = 0.0 + vec.values(i) = 0.0 } } else { - arr(i) = 0.0 + vec.values(i) = 0.0 } i += 1 } // predictions are all correct, no gradient signal - if (arr.forall(_ == 0)) return this + if (vec.values.forall(_ == 0)) return this if (fitIntercept) { BLAS.gemv(1.0, block.featureMatrix.transpose, vec, 0.0, linearGradSumVec) linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } - localGradientSumArray(numFeatures) += arr.sum + localGradientSumArray(numFeatures) += vec.values.sum } else { val gradSumVec = new DenseVector(localGradientSumArray) BLAS.gemv(1.0, block.featureMatrix.transpose, vec, 1.0, gradSumVec) From 819227e618439d376b91367afdb5fa6d47b5913c Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 28 Jan 2020 16:22:51 +0800 Subject: [PATCH 10/10] nit --- .../apache/spark/ml/feature/Instance.scala | 22 +++++++++---------- .../ml/optim/aggregator/HingeAggregator.scala | 8 +++---- .../spark/ml/feature/InstanceSuite.scala | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index c09a7c490170b..5476a86eb9d76 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -36,26 +36,26 @@ private[spark] case class Instance(label: Double, weight: Double, features: Vect * Class that represents an block of instance. * If all weights are 1, then an empty array is stored. */ -private[spark] class InstanceBlock( - val labels: Array[Double], - val weights: Array[Double], - val featureMatrix: Matrix) { - require(labels.length == featureMatrix.numRows) - require(featureMatrix.isTransposed) +private[spark] case class InstanceBlock( + labels: Array[Double], + weights: Array[Double], + matrix: Matrix) { + require(labels.length == matrix.numRows) + require(matrix.isTransposed) if (weights.nonEmpty) { require(labels.length == weights.length) } def size: Int = labels.length - def numFeatures: Int = featureMatrix.numCols + def numFeatures: Int = matrix.numCols def instanceIterator: Iterator[Instance] = { if (weights.nonEmpty) { - labels.iterator.zip(weights.iterator).zip(featureMatrix.rowIter) + labels.iterator.zip(weights.iterator).zip(matrix.rowIter) .map { case ((label, weight), vec) => Instance(label, weight, vec) } } else { - labels.iterator.zip(featureMatrix.rowIter) + labels.iterator.zip(matrix.rowIter) .map { case (label, vec) => Instance(label, 1.0, vec) } } } @@ -82,7 +82,7 @@ private[spark] class InstanceBlock( // directly get the non-zero iterator of i-th row vector without array copy or slice @transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = { - featureMatrix match { + matrix match { case dm: DenseMatrix => (i: Int) => val start = numFeatures * i @@ -147,7 +147,7 @@ private[spark] object InstanceBlock { new SparseMatrix(numRows, numCols, rowPtrs.result(), colIndices.result(), values.result(), true) } - new InstanceBlock(labels, weights, matrix) + InstanceBlock(labels, weights, matrix) } def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 5b7b3de9bd23b..25f7c9ddab42d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -151,9 +151,9 @@ private[ml] class HingeAggregator( vec.values(i) = intercept i += 1 } - BLAS.gemv(1.0, block.featureMatrix, linear, 1.0, vec) + BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) } else { - BLAS.gemv(1.0, block.featureMatrix, linear, 0.0, vec) + BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) } // in-place convert dotProducts to gradient scales @@ -185,12 +185,12 @@ private[ml] class HingeAggregator( if (vec.values.forall(_ == 0)) return this if (fitIntercept) { - BLAS.gemv(1.0, block.featureMatrix.transpose, vec, 0.0, linearGradSumVec) + BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec) linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } localGradientSumArray(numFeatures) += vec.values.sum } else { val gradSumVec = new DenseVector(localGradientSumArray) - BLAS.gemv(1.0, block.featureMatrix.transpose, vec, 1.0, gradSumVec) + BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) } this diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index afe01ea3fd7a8..d780bdf5f5dc8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -49,7 +49,7 @@ class InstanceSuite extends SparkFunSuite{ val o2 = ser.deserialize[InstanceBlock](ser.serialize(o)) assert(o.labels === o2.labels) assert(o.weights === o2.weights) - assert(o.featureMatrix === o2.featureMatrix) + assert(o.matrix === o2.matrix) } }