From b835c532a583d28752fe2c99604800ab101de04f Mon Sep 17 00:00:00 2001 From: Andrew-Crosby Date: Sat, 15 Jun 2019 20:33:08 +0100 Subject: [PATCH 1/2] Avoid unnecessary copy of coefficients vector each time an instance is added Follows approach used in LeastSquaresAggregator --- .../apache/spark/ml/optim/aggregator/HuberAggregator.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala index 13f64d2d50424..137a505f4c9dc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala @@ -81,6 +81,9 @@ private[ml] class HuberAggregator( } else { 0.0 } + // make transient so we do not serialize between aggregation stages + @transient private lazy val featuresStd = bcFeaturesStd.value + @transient private lazy val coefficients = bcParameters.value.toArray.slice(0, numFeatures) /** * Add a new training instance to this HuberAggregator, and update the loss and gradient @@ -96,8 +99,8 @@ private[ml] class HuberAggregator( require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this - val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = bcParameters.value.toArray.slice(0, numFeatures) + val localFeaturesStd = featuresStd + val localCoefficients = coefficients val localGradientSumArray = gradientSumArray val margin = { From cdb49c06ed2b6e008486fc1af294c583e83fafbd Mon Sep 17 00:00:00 2001 From: Andrew-Crosby Date: Tue, 18 Jun 2019 20:10:39 +0100 Subject: [PATCH 2/2] Revert unnecessary change --- .../org/apache/spark/ml/optim/aggregator/HuberAggregator.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala index 137a505f4c9dc..fc4c423a60b2a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala @@ -82,7 +82,6 @@ private[ml] class HuberAggregator( 0.0 } // make transient so we do not serialize between aggregation stages - @transient private lazy val featuresStd = bcFeaturesStd.value @transient private lazy val coefficients = bcParameters.value.toArray.slice(0, numFeatures) /** @@ -99,7 +98,7 @@ private[ml] class HuberAggregator( require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this - val localFeaturesStd = featuresStd + val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficients val localGradientSumArray = gradientSumArray