diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala index 85f9732f79f67..31af8f5c6aa68 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala @@ -21,13 +21,10 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param.{ParamMap, Params} import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.util._ -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} -import org.apache.spark.mllib.stat.Statistics -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructField, StructType} @@ -70,14 +67,40 @@ class MaxAbsScaler @Since("2.0.0") (@Since("2.0.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): MaxAbsScalerModel = { transformSchema(dataset.schema, logging = true) - val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { - case Row(v: Vector) => OldVectors.fromML(v) - } - val summary = Statistics.colStats(input) - val minVals = summary.min.toArray - val maxVals = summary.max.toArray - val n = minVals.length - val maxAbs = Array.tabulate(n) { i => math.max(math.abs(minVals(i)), math.abs(maxVals(i))) } + + val maxAbs = dataset.select($(inputCol)).rdd.map { + row => row.getAs[Vector](0) + }.treeAggregate[Array[Double]](Array.emptyDoubleArray)( + seqOp = { + case (max, vec) if max.isEmpty => + vec.toArray.map(math.abs) + case (max, vec) => + require(max.length == vec.size, + s"Dimensions mismatch when adding new sample: ${max.length} != ${vec.size}") + vec.foreachActive { + case (i, v) if v != 0.0 => + val av = math.abs(v) + if (av > max(i)) { + max(i) = av + } + case _ => + } + max + }, combOp = { + case (max1, max2) if max1.isEmpty => + max2 + case (max1, max2) if max2.isEmpty => + max1 + case (max1, max2) => + require(max1.length == max2.length, + s"Dimensions mismatch when merging: ${max1.length} != ${max2.length}") + for (i <- 0 until max1.length) { + max1(i) = math.max(max1(i), max2(i)) + } + max1 + }) + + require(maxAbs.nonEmpty, "Input dataset must be non-empty") copyValues(new MaxAbsScalerModel(uid, Vectors.dense(maxAbs)).setParent(this)) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala index 19978c97d2cfd..de6badf133f64 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala @@ -25,11 +25,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.ml.param.{DoubleParam, ParamMap, Params} import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.util._ -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} -import org.apache.spark.mllib.linalg.VectorImplicits._ -import org.apache.spark.mllib.stat.Statistics import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructField, StructType} @@ -117,11 +113,75 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): MinMaxScalerModel = { transformSchema(dataset.schema, logging = true) - val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { - case Row(v: Vector) => OldVectors.fromML(v) + + val (mins, maxs, nnz, cnt) = dataset.select($(inputCol)).rdd.map { + row => row.getAs[Vector](0) + }.treeAggregate[(Array[Double], Array[Double], Array[Long], Long)]( + (Array.emptyDoubleArray, Array.emptyDoubleArray, Array.emptyLongArray, 0L))( + seqOp = { + case ((min, max, nnz, cnt), vec) if cnt == 0 => + val n = vec.size + val min_ = Array.fill[Double](n)(Double.MaxValue) + val max_ = Array.fill[Double](n)(Double.MinValue) + val nnz_ = Array.fill[Long](n)(0L) + vec.foreachActive { + case (i, v) if v != 0.0 => + if (v < min_(i)) { + min_(i) = v + } + if (v > max_(i)) { + max_(i) = v + } + nnz_(i) = 1L + case _ => + } + (min_, max_, nnz_, 1L) + case ((min, max, nnz, cnt), vec) => + require(min.length == vec.size, + s"Dimensions mismatch when adding new sample: ${min.length} != ${vec.size}") + vec.foreachActive { + case (i, v) if v != 0.0 => + if (v < min(i)) { + min(i) = v + } + if (v > max(i)) { + max(i) = v + } + nnz(i) += 1 + case _ => + } + (min, max, nnz, cnt + 1) + }, combOp = { + case ((min1, max1, nnz1, cnt1), (min2, max2, nnz2, cnt2)) if cnt1 == 0 => + (min2, max2, nnz2, cnt2) + case ((min1, max1, nnz1, cnt1), (min2, max2, nnz2, cnt2)) if cnt2 == 0 => + (min1, max1, nnz1, cnt1) + case ((min1, max1, nnz1, cnt1), (min2, max2, nnz2, cnt2)) => + require(min1.length == min2.length, + s"Dimensions mismatch when merging: ${min1.length} != ${min2.length}") + for (i <- 0 until min1.length) { + min1(i) = math.min(min1(i), min2(i)) + max1(i) = math.max(max1(i), max2(i)) + nnz1(i) += nnz2(i) + } + (min1, max1, nnz1, cnt1 + cnt2) + }) + + require(cnt > 0, "Input dataset must be non-empty") + + for(i <- 0 until mins.length) { + if (nnz(i) < cnt) { + if (mins(i) > 0.0) { + mins(i) = 0.0 + } + if (maxs(i) < 0.0) { + maxs(i) = 0.0 + } + } } - val summary = Statistics.colStats(input) - copyValues(new MinMaxScalerModel(uid, summary.min, summary.max).setParent(this)) + + copyValues(new MinMaxScalerModel(uid, Vectors.dense(mins), Vectors.dense(maxs)) + .setParent(this)) } @Since("1.5.0")