Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Since
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param.{ParamMap, Params}
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
Expand Down Expand Up @@ -70,14 +67,40 @@ class MaxAbsScaler @Since("2.0.0") (@Since("2.0.0") override val uid: String)
@Since("2.0.0")
override def fit(dataset: Dataset[_]): MaxAbsScalerModel = {
transformSchema(dataset.schema, logging = true)
val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
case Row(v: Vector) => OldVectors.fromML(v)
}
val summary = Statistics.colStats(input)
val minVals = summary.min.toArray
val maxVals = summary.max.toArray
val n = minVals.length
val maxAbs = Array.tabulate(n) { i => math.max(math.abs(minVals(i)), math.abs(maxVals(i))) }

val maxAbs = dataset.select($(inputCol)).rdd.map {
row => row.getAs[Vector](0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would moving this inside the tree aggregate possibly make the computation faster? eg operate on row of vector? that way you wouldn't have to pass through the data twice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think it might make the code clearer to:
1.) map to Array[Double] similar to what you did with vector but take the abs
2.) instead of using treeAggregate just do a simple reduce on the arrays by getting the max for each slot
that would simplify the code more. Would that be worse performance-wise?

}.treeAggregate[Array[Double]](Array.emptyDoubleArray)(
seqOp = {
case (max, vec) if max.isEmpty =>
vec.toArray.map(math.abs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it the call to colStats and vector conversion that was so inefficient? do you have any performance numbers to justify the change, since it does make the code more complicated.

case (max, vec) =>
require(max.length == vec.size,
s"Dimensions mismatch when adding new sample: ${max.length} != ${vec.size}")
vec.foreachActive {
case (i, v) if v != 0.0 =>
val av = math.abs(v)
if (av > max(i)) {
max(i) = av
}
case _ =>
}
max
}, combOp = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may make the code clearer to move the seqOp and combOp to separate methods

case (max1, max2) if max1.isEmpty =>
max2
case (max1, max2) if max2.isEmpty =>
max1
case (max1, max2) =>
require(max1.length == max2.length,
s"Dimensions mismatch when merging: ${max1.length} != ${max2.length}")
for (i <- 0 until max1.length) {
max1(i) = math.max(max1(i), max2(i))
}
max1
})

require(maxAbs.nonEmpty, "Input dataset must be non-empty")

copyValues(new MaxAbsScalerModel(uid, Vectors.dense(maxAbs)).setParent(this))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.{DoubleParam, ParamMap, Params}
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
Expand Down Expand Up @@ -117,11 +113,75 @@ class MinMaxScaler @Since("1.5.0") (@Since("1.5.0") override val uid: String)
@Since("2.0.0")
override def fit(dataset: Dataset[_]): MinMaxScalerModel = {
transformSchema(dataset.schema, logging = true)
val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
case Row(v: Vector) => OldVectors.fromML(v)

val (mins, maxs, nnz, cnt) = dataset.select($(inputCol)).rdd.map {
row => row.getAs[Vector](0)
}.treeAggregate[(Array[Double], Array[Double], Array[Long], Long)](
(Array.emptyDoubleArray, Array.emptyDoubleArray, Array.emptyLongArray, 0L))(
seqOp = {
case ((min, max, nnz, cnt), vec) if cnt == 0 =>
val n = vec.size
val min_ = Array.fill[Double](n)(Double.MaxValue)
val max_ = Array.fill[Double](n)(Double.MinValue)
val nnz_ = Array.fill[Long](n)(0L)
vec.foreachActive {
case (i, v) if v != 0.0 =>
if (v < min_(i)) {
min_(i) = v
}
if (v > max_(i)) {
max_(i) = v
}
nnz_(i) = 1L
case _ =>
}
(min_, max_, nnz_, 1L)
case ((min, max, nnz, cnt), vec) =>
require(min.length == vec.size,
s"Dimensions mismatch when adding new sample: ${min.length} != ${vec.size}")
vec.foreachActive {
case (i, v) if v != 0.0 =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks the same as the code above, can you refactor to a separate function and call it from both places?

if (v < min(i)) {
min(i) = v
}
if (v > max(i)) {
max(i) = v
}
nnz(i) += 1
case _ =>
}
(min, max, nnz, cnt + 1)
}, combOp = {
case ((min1, max1, nnz1, cnt1), (min2, max2, nnz2, cnt2)) if cnt1 == 0 =>
(min2, max2, nnz2, cnt2)
case ((min1, max1, nnz1, cnt1), (min2, max2, nnz2, cnt2)) if cnt2 == 0 =>
(min1, max1, nnz1, cnt1)
case ((min1, max1, nnz1, cnt1), (min2, max2, nnz2, cnt2)) =>
require(min1.length == min2.length,
s"Dimensions mismatch when merging: ${min1.length} != ${min2.length}")
for (i <- 0 until min1.length) {
min1(i) = math.min(min1(i), min2(i))
max1(i) = math.max(max1(i), max2(i))
nnz1(i) += nnz2(i)
}
(min1, max1, nnz1, cnt1 + cnt2)
})

require(cnt > 0, "Input dataset must be non-empty")

for(i <- 0 until mins.length) {
if (nnz(i) < cnt) {
if (mins(i) > 0.0) {
mins(i) = 0.0
}
if (maxs(i) < 0.0) {
maxs(i) = 0.0
}
}
}
val summary = Statistics.colStats(input)
copyValues(new MinMaxScalerModel(uid, summary.min, summary.max).setParent(this))

copyValues(new MinMaxScalerModel(uid, Vectors.dense(mins), Vectors.dense(maxs))
.setParent(this))
}

@Since("1.5.0")
Expand Down