From 72991dbec4c5c5e38fa0ab74a6b83d87007a7f12 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 4 Jul 2016 15:24:55 -0700 Subject: [PATCH 1/2] fix empty partition issue --- .../mllib/linalg/distributed/RowMatrix.scala | 29 ++++++++++++------- .../linalg/distributed/RowMatrixSuite.scala | 17 +++++++++++ 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index cd5209d0ebe20..4c980e67d9f49 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -538,20 +538,29 @@ class RowMatrix @Since("1.0.0") ( val col = numCols().toInt // split rows horizontally into smaller matrices, and compute QR for each of them val blockQRs = rows.glom().map { partRows => - val bdm = BDM.zeros[Double](partRows.length, col) - var i = 0 - partRows.foreach { row => - bdm(i, ::) := row.asBreeze.t - i += 1 + if (partRows.length == 0) { + None + } else { + val bdm = BDM.zeros[Double](partRows.length, col) + var i = 0 + partRows.foreach { row => + bdm(i, ::) := row.asBreeze.t + i += 1 + } + Some(breeze.linalg.qr.reduced(bdm).r) } - breeze.linalg.qr.reduced(bdm).r } // combine the R part from previous results vertically into a tall matrix - val combinedR = blockQRs.treeReduce{ (r1, r2) => - val stackedR = BDM.vertcat(r1, r2) - breeze.linalg.qr.reduced(stackedR).r - } + val combinedR = blockQRs.treeReduce { + case (Some(r1), Some(r2)) => + val stackedR = BDM.vertcat(r1, r2) + Some(breeze.linalg.qr.reduced(stackedR).r) + case (Some(r1), None) => Some(r1) + case (None, Some(r2)) => Some(r2) + case _ => None + }.get + val finalR = Matrices.fromBreeze(combinedR.toDenseMatrix) val finalQ = if (computeQ) { try { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 7c4c6d8409c6c..7c9e14f8cee70 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} import org.apache.spark.mllib.random.RandomRDDs import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} +import org.apache.spark.mllib.util.TestingUtils._ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { @@ -281,6 +282,22 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { assert(cov(i, j) === cov(j, i)) } } + + test("QR decomposition should aware of empty partition (SPARK-16369)") { + val mat: RowMatrix = new RowMatrix(sc.parallelize(denseData, 1)) + val qrResult = mat.tallSkinnyQR(true) + + val matWithEmptyPartition = new RowMatrix(sc.parallelize(denseData, 8)) + val qrResult2 = matWithEmptyPartition.tallSkinnyQR(true) + + assert(qrResult.Q.numCols() === qrResult2.Q.numCols(), "Q matrix ncol not match") + assert(qrResult.Q.numRows() === qrResult2.Q.numRows(), "Q matrix nrow not match") + qrResult.Q.rows.collect().zip(qrResult2.Q.rows.collect()) + .foreach(x => assert(x._1 ~== x._2 relTol 1E-8, "Q matrix not match")) + + qrResult.R.toArray.zip(qrResult2.R.toArray) + .foreach(x => assert(x._1 ~== x._2 relTol 1E-8, "R matrix not match")) + } } class RowMatrixClusterSuite extends SparkFunSuite with LocalClusterSparkContext { From f6c25d901f42838b090b5aa5c3cdf5467f54ca63 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 5 Jul 2016 10:18:35 -0700 Subject: [PATCH 2/2] change to filter map --- .../mllib/linalg/distributed/RowMatrix.scala | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 4c980e67d9f49..86f8572e37b9a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -537,29 +537,21 @@ class RowMatrix @Since("1.0.0") ( def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, Matrix] = { val col = numCols().toInt // split rows horizontally into smaller matrices, and compute QR for each of them - val blockQRs = rows.glom().map { partRows => - if (partRows.length == 0) { - None - } else { - val bdm = BDM.zeros[Double](partRows.length, col) - var i = 0 - partRows.foreach { row => - bdm(i, ::) := row.asBreeze.t - i += 1 - } - Some(breeze.linalg.qr.reduced(bdm).r) + val blockQRs = rows.glom().filter(_.length != 0).map { partRows => + val bdm = BDM.zeros[Double](partRows.length, col) + var i = 0 + partRows.foreach { row => + bdm(i, ::) := row.asBreeze.t + i += 1 } + breeze.linalg.qr.reduced(bdm).r } // combine the R part from previous results vertically into a tall matrix - val combinedR = blockQRs.treeReduce { - case (Some(r1), Some(r2)) => - val stackedR = BDM.vertcat(r1, r2) - Some(breeze.linalg.qr.reduced(stackedR).r) - case (Some(r1), None) => Some(r1) - case (None, Some(r2)) => Some(r2) - case _ => None - }.get + val combinedR = blockQRs.treeReduce { (r1, r2) => + val stackedR = BDM.vertcat(r1, r2) + breeze.linalg.qr.reduced(stackedR).r + } val finalR = Matrices.fromBreeze(combinedR.toDenseMatrix) val finalQ = if (computeQ) {