From ea201aaeca8af1f75198c61a15c9afcf6dde5cdd Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 13 Jan 2017 14:55:50 +0800 Subject: [PATCH 1/4] create pr --- .../spark/ml/feature/MaxAbsScaler.scala | 34 +++++++++++------ .../spark/ml/feature/MinMaxScaler.scala | 37 ++++++++++++++----- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala index 85f9732f79f67..b9228c6a2cd16 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala @@ -21,13 +21,10 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param.{ParamMap, Params} import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.util._ -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} -import org.apache.spark.mllib.stat.Statistics -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructField, StructType} @@ -70,14 +67,27 @@ class MaxAbsScaler @Since("2.0.0") (@Since("2.0.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): MaxAbsScalerModel = { transformSchema(dataset.schema, logging = true) - val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { - case Row(v: Vector) => OldVectors.fromML(v) - } - val summary = Statistics.colStats(input) - val minVals = summary.min.toArray - val maxVals = summary.max.toArray - val n = minVals.length - val maxAbs = Array.tabulate(n) { i => math.max(math.abs(minVals(i)), math.abs(maxVals(i))) } + + val numFeatures = dataset.select($(inputCol)).first().getAs[Vector](0).size + val maxAbs = dataset.select($(inputCol)).rdd.map { + row => row.getAs[Vector](0) + }.treeAggregate[Array[Double]](Array.fill(numFeatures)(0.0))( + seqOp = { + case (max, vec) => + vec.foreachActive { + case (i, v) => + val abs = math.abs(v) + if (abs > max(i)) { + max(i) = abs + } + } + max + }, combOp = { + case (max1, max2) => + max1.zip(max2).map { + case (m1, m2) => math.max(m1, m2) + } + }) copyValues(new MaxAbsScalerModel(uid, Vectors.dense(maxAbs)).setParent(this)) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala index 19978c97d2cfd..ae11c554febfe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala @@ -25,11 +25,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.ml.param.{DoubleParam, ParamMap, Params} import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.util._ -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} -import org.apache.spark.mllib.linalg.VectorImplicits._ -import org.apache.spark.mllib.stat.Statistics import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructField, StructType} @@ -117,11 +113,34 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): MinMaxScalerModel = { transformSchema(dataset.schema, logging = true) - val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { - case Row(v: Vector) => OldVectors.fromML(v) - } - val summary = Statistics.colStats(input) - copyValues(new MinMaxScalerModel(uid, summary.min, summary.max).setParent(this)) + + val numFeatures = dataset.select($(inputCol)).first().getAs[Vector](0).size + val (mins, maxs) = dataset.select($(inputCol)).rdd.map { + row => row.getAs[Vector](0) + }.treeAggregate[(Array[Double], Array[Double])]( + (Array.fill(numFeatures)(Double.MaxValue), Array.fill(numFeatures)(Double.MinValue)))( + seqOp = { + case ((min, max), vec) => + vec.foreachActive { + case (i, v) => + if (v < min(i)) { + min(i) = v + } else if (v > max(i)) { + max(i) = v + } + } + (min, max) + }, combOp = { + case ((min1, max1), (min2, max2)) => + (min1.zip(min2).map { + case (m1, m2) => math.min(m1, m2) + }, max1.zip(max2).map { + case (m1, m2) => math.max(m1, m2) + }) + }) + + copyValues(new MinMaxScalerModel(uid, Vectors.dense(mins), Vectors.dense(maxs)) + .setParent(this)) } @Since("1.5.0") From ff7786f12b0ca1ac122a75e443bd2358638117d6 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 13 Jan 2017 17:44:07 +0800 Subject: [PATCH 2/4] update --- .../spark/ml/feature/MaxAbsScaler.scala | 29 ++++++--- .../spark/ml/feature/MinMaxScaler.scala | 61 +++++++++++++++---- 2 files changed, 69 insertions(+), 21 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala index b9228c6a2cd16..31af8f5c6aa68 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala @@ -68,27 +68,40 @@ class MaxAbsScaler @Since("2.0.0") (@Since("2.0.0") override val uid: String) override def fit(dataset: Dataset[_]): MaxAbsScalerModel = { transformSchema(dataset.schema, logging = true) - val numFeatures = dataset.select($(inputCol)).first().getAs[Vector](0).size val maxAbs = dataset.select($(inputCol)).rdd.map { row => row.getAs[Vector](0) - }.treeAggregate[Array[Double]](Array.fill(numFeatures)(0.0))( + }.treeAggregate[Array[Double]](Array.emptyDoubleArray)( seqOp = { + case (max, vec) if max.isEmpty => + vec.toArray.map(math.abs) case (max, vec) => + require(max.length == vec.size, + s"Dimensions mismatch when adding new sample: ${max.length} != ${vec.size}") vec.foreachActive { - case (i, v) => - val abs = math.abs(v) - if (abs > max(i)) { - max(i) = abs + case (i, v) if v != 0.0 => + val av = math.abs(v) + if (av > max(i)) { + max(i) = av } + case _ => } max }, combOp = { + case (max1, max2) if max1.isEmpty => + max2 + case (max1, max2) if max2.isEmpty => + max1 case (max1, max2) => - max1.zip(max2).map { - case (m1, m2) => math.max(m1, m2) + require(max1.length == max2.length, + s"Dimensions mismatch when merging: ${max1.length} != ${max2.length}") + for (i <- 0 until max1.length) { + max1(i) = math.max(max1(i), max2(i)) } + max1 }) + require(maxAbs.nonEmpty, "Input dataset must be non-empty") + copyValues(new MaxAbsScalerModel(uid, Vectors.dense(maxAbs)).setParent(this)) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala index ae11c554febfe..3682b2fb2d153 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala @@ -114,31 +114,66 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String) override def fit(dataset: Dataset[_]): MinMaxScalerModel = { transformSchema(dataset.schema, logging = true) - val numFeatures = dataset.select($(inputCol)).first().getAs[Vector](0).size - val (mins, maxs) = dataset.select($(inputCol)).rdd.map { + val (mins, maxs, nnz, cnt) = dataset.select($(inputCol)).rdd.map { row => row.getAs[Vector](0) - }.treeAggregate[(Array[Double], Array[Double])]( - (Array.fill(numFeatures)(Double.MaxValue), Array.fill(numFeatures)(Double.MinValue)))( + }.treeAggregate[(Array[Double], Array[Double], Array[Long], Long)]( + (Array.emptyDoubleArray, Array.emptyDoubleArray, Array.emptyLongArray, 0L))( seqOp = { - case ((min, max), vec) => + case ((min, max, nnz, cnt), vec) if cnt == 0 => + val n = vec.size + val min_ = Array.fill[Double](n)(Double.MaxValue) + val max_ = Array.fill[Double](n)(Double.MinValue) + val nnz_ = Array.fill[Long](n)(0L) vec.foreachActive { - case (i, v) => + case (i, v) if v != 0.0 => + min_(i) = v + max_(i) = v + nnz_(i) = 1L + case _ => + } + (min_, max_, nnz_, 1L) + case ((min, max, nnz, cnt), vec) => + require(min.length == vec.size, + s"Dimensions mismatch when adding new sample: ${min.length} != ${vec.size}") + vec.foreachActive { + case (i, v) if v != 0.0 => if (v < min(i)) { min(i) = v } else if (v > max(i)) { max(i) = v } + nnz(i) += 1 + case _ => } - (min, max) + (min, max, nnz, cnt + 1) }, combOp = { - case ((min1, max1), (min2, max2)) => - (min1.zip(min2).map { - case (m1, m2) => math.min(m1, m2) - }, max1.zip(max2).map { - case (m1, m2) => math.max(m1, m2) - }) + case ((min1, max1, nnz1, cnt1), (min2, max2, nnz2, cnt2)) if cnt1 == 0 => + (min2, max2, nnz2, cnt2) + case ((min1, max1, nnz1, cnt1), (min2, max2, nnz2, cnt2)) if cnt2 == 0 => + (min1, max1, nnz1, cnt1) + case ((min1, max1, nnz1, cnt1), (min2, max2, nnz2, cnt2)) => + require(min1.length == min2.length, + s"Dimensions mismatch when merging: ${min1.length} != ${min2.length}") + for (i <- 0 until min1.length) { + min1(i) = math.min(min1(i), min2(i)) + max1(i) = math.max(max1(i), max2(i)) + nnz1(i) += nnz2(i) + } + (min1, max1, nnz1, cnt1 + cnt2) }) + require(cnt > 0, "Input dataset must be non-empty") + + for(i <- 0 until mins.length) { + if (nnz(i) < cnt) { + if (mins(i) > 0.0) { + mins(i) = 0.0 + } else if (maxs(i) < 0.0) { + maxs(i) = 0.0 + } + } + } + copyValues(new MinMaxScalerModel(uid, Vectors.dense(mins), Vectors.dense(maxs)) .setParent(this)) } From 671e5660276b72018414569f8b86b2713c432836 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 13 Jan 2017 19:41:07 +0800 Subject: [PATCH 3/4] fix for nan --- .../org/apache/spark/ml/feature/MinMaxScaler.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala index 3682b2fb2d153..5b96cc47078ab 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala @@ -125,7 +125,7 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String) val max_ = Array.fill[Double](n)(Double.MinValue) val nnz_ = Array.fill[Long](n)(0L) vec.foreachActive { - case (i, v) if v != 0.0 => + case (i, v) if v != 0.0 && !v.isNaN => min_(i) = v max_(i) = v nnz_(i) = 1L @@ -136,10 +136,11 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String) require(min.length == vec.size, s"Dimensions mismatch when adding new sample: ${min.length} != ${vec.size}") vec.foreachActive { - case (i, v) if v != 0.0 => + case (i, v) if v != 0.0 && !v.isNaN => if (v < min(i)) { min(i) = v - } else if (v > max(i)) { + } + if (v > max(i)) { max(i) = v } nnz(i) += 1 @@ -168,7 +169,8 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String) if (nnz(i) < cnt) { if (mins(i) > 0.0) { mins(i) = 0.0 - } else if (maxs(i) < 0.0) { + } + if (maxs(i) < 0.0) { maxs(i) = 0.0 } } From 5839ac206b4c011f042480895b9a9bc388f01292 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 13 Jan 2017 19:54:22 +0800 Subject: [PATCH 4/4] fix for nan II --- .../org/apache/spark/ml/feature/MinMaxScaler.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala index 5b96cc47078ab..de6badf133f64 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala @@ -125,9 +125,13 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String) val max_ = Array.fill[Double](n)(Double.MinValue) val nnz_ = Array.fill[Long](n)(0L) vec.foreachActive { - case (i, v) if v != 0.0 && !v.isNaN => - min_(i) = v - max_(i) = v + case (i, v) if v != 0.0 => + if (v < min_(i)) { + min_(i) = v + } + if (v > max_(i)) { + max_(i) = v + } nnz_(i) = 1L case _ => } @@ -136,7 +140,7 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String) require(min.length == vec.size, s"Dimensions mismatch when adding new sample: ${min.length} != ${vec.size}") vec.foreachActive { - case (i, v) if v != 0.0 && !v.isNaN => + case (i, v) if v != 0.0 => if (v < min(i)) { min(i) = v }