From 4d7c2977095c52e45470c9b7c19dcb24e74eee43 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 19 Jan 2016 22:47:13 +0100 Subject: [PATCH 1/4] Implemented an improved version of the toIndexedRowMatrix method of the BlockMatrix --- .../linalg/distributed/BlockMatrix.scala | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 09527dcf5d9e5..ca32041be858c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -23,7 +23,7 @@ import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.{Logging, Partitioner, SparkException} import org.apache.spark.annotation.Since -import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix} +import org.apache.spark.mllib.linalg._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -262,14 +262,32 @@ class BlockMatrix @Since("1.3.0") ( } new CoordinateMatrix(entryRDD, numRows(), numCols()) } - + /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */ @Since("1.3.0") def toIndexedRowMatrix(): IndexedRowMatrix = { require(numCols() < Int.MaxValue, "The number of columns must be within the integer range. " + s"numCols: ${numCols()}") - // TODO: This implementation may be optimized - toCoordinateMatrix().toIndexedRowMatrix() + + val rows = blocks.map(block => (block._1._1, (block._1._2, block._2))) + .groupByKey() + .flatMap { case (row, matricesItr) => + + val rows = matricesItr.head._2.numRows + val res = BDM.zeros[Double](rows, numCols.toInt) + + matricesItr.foreach { case ((idx: Int, mat: Matrix)) => + val offset = colsPerBlock * idx + res(0 until mat.numRows, offset until offset + mat.numCols) := mat.toBreeze + } + + (0 until rows).map(idx => new IndexedRow( + (row * rowsPerBlock) + idx, + Vectors.dense(res.t(::, idx).toArray) + )) + } + + new IndexedRowMatrix(rows) } /** Collect the distributed matrix on the driver as a `DenseMatrix`. */ From fe1842ea343b1a845e67f47e28aa4ef33818f9cb Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sun, 13 Mar 2016 19:26:39 +0100 Subject: [PATCH 2/4] Updated the method based on the suggestion of @mengxr --- .../linalg/distributed/BlockMatrix.scala | 33 ++++++++----------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index ca32041be858c..9590d941088ca 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg.distributed import scala.collection.mutable.ArrayBuffer -import breeze.linalg.{DenseMatrix => BDM} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} import org.apache.spark.{Logging, Partitioner, SparkException} import org.apache.spark.annotation.Since @@ -262,31 +262,26 @@ class BlockMatrix @Since("1.3.0") ( } new CoordinateMatrix(entryRDD, numRows(), numCols()) } - + /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */ @Since("1.3.0") def toIndexedRowMatrix(): IndexedRowMatrix = { require(numCols() < Int.MaxValue, "The number of columns must be within the integer range. " + s"numCols: ${numCols()}") - val rows = blocks.map(block => (block._1._1, (block._1._2, block._2))) - .groupByKey() - .flatMap { case (row, matricesItr) => - - val rows = matricesItr.head._2.numRows - val res = BDM.zeros[Double](rows, numCols.toInt) - - matricesItr.foreach { case ((idx: Int, mat: Matrix)) => - val offset = colsPerBlock * idx - res(0 until mat.numRows, offset until offset + mat.numCols) := mat.toBreeze - } - - (0 until rows).map(idx => new IndexedRow( - (row * rowsPerBlock) + idx, - Vectors.dense(res.t(::, idx).toArray) - )) + val rows = blocks.flatMap { case ((blockRowIdx, blockColIdx), mat) => + val dMat = mat.toBreeze.toDenseMatrix.t + (0 until mat.numRows).map(rowIds => + blockRowIdx * rowsPerBlock + rowIds -> (blockColIdx, dMat(::, rowIds).toDenseVector) + ) + }.groupByKey().map { case (rowIdx, vectors) => + val wholeVector = BDV.zeros[Double](numCols().toInt) + vectors.foreach { case (blockColIdx: Int, vec: BDV[Double]) => + val offset = colsPerBlock * blockColIdx + wholeVector(offset until offset + vec.length) := vec } - + new IndexedRow(rowIdx, Vectors.fromBreeze(wholeVector)) + } new IndexedRowMatrix(rows) } From d3c780d5d73fbb21cb18d5f4ee66cdfd7b1a59f3 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 17 Mar 2016 11:26:05 +0100 Subject: [PATCH 3/4] Updated the code based to make use of the Matrix iterator --- .../linalg/distributed/BlockMatrix.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index d2ac037328cee..07cf87028aec0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg.distributed import scala.collection.mutable.ArrayBuffer -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM, SparseVector => BSV, Vector => BV} import org.apache.spark.{Logging, Partitioner, SparkException} import org.apache.spark.annotation.Since @@ -263,6 +263,7 @@ class BlockMatrix @Since("1.3.0") ( new CoordinateMatrix(entryRDD, numRows(), numCols()) } + /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */ @Since("1.3.0") def toIndexedRowMatrix(): IndexedRowMatrix = { @@ -270,15 +271,21 @@ class BlockMatrix @Since("1.3.0") ( s"numCols: ${numCols()}") val rows = blocks.flatMap { case ((blockRowIdx, blockColIdx), mat) => - val dMat = mat.toBreeze.toDenseMatrix.t - (0 until mat.numRows).map(rowIds => - blockRowIdx * rowsPerBlock + rowIds -> (blockColIdx, dMat(::, rowIds).toDenseVector) - ) + mat.rowIter.zipWithIndex.map { + case (vector, rowIdx) => + blockRowIdx * rowsPerBlock + rowIdx -> (blockColIdx, vector.toBreeze) + } }.groupByKey().map { case (rowIdx, vectors) => - val wholeVector = BDV.zeros[Double](numCols().toInt) - vectors.foreach { case (blockColIdx: Int, vec: BDV[Double]) => + + val wholeVector = vectors.head match { + case (idx, v: BDV[_]) => BDV.zeros[Double](numCols().toInt) + case (idx, v: BSV[_]) => BSV.zeros[Double](numCols().toInt) + case _ => throw new SparkException(s"Cannot convert an empty vector to an indexed row") + } + + vectors.foreach { case (blockColIdx: Int, vec: BV[Double]) => val offset = colsPerBlock * blockColIdx - wholeVector(offset until offset + vec.length) := vec + wholeVector(offset until offset + vec.size) := vec } new IndexedRow(rowIdx, Vectors.fromBreeze(wholeVector)) } From 6c0b58d965cc91bbfddc2b1a2693fb6f3d3da9cb Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 21 Mar 2016 23:34:34 +0100 Subject: [PATCH 4/4] Processed the feedback of @mengxr and added tests for the mapping of Dense and Sparse vectors --- .../linalg/distributed/BlockMatrix.scala | 16 +++++----- .../linalg/distributed/BlockMatrixSuite.scala | 31 +++++++++++++++++-- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 07cf87028aec0..aa3bbaf01866b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -267,8 +267,9 @@ class BlockMatrix @Since("1.3.0") ( /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */ @Since("1.3.0") def toIndexedRowMatrix(): IndexedRowMatrix = { - require(numCols() < Int.MaxValue, "The number of columns must be within the integer range. " + - s"numCols: ${numCols()}") + val cols = numCols().toInt + + require(cols < Int.MaxValue, s"The number of columns should be less than Int.MaxValue ($cols).") val rows = blocks.flatMap { case ((blockRowIdx, blockColIdx), mat) => mat.rowIter.zipWithIndex.map { @@ -276,16 +277,17 @@ class BlockMatrix @Since("1.3.0") ( blockRowIdx * rowsPerBlock + rowIdx -> (blockColIdx, vector.toBreeze) } }.groupByKey().map { case (rowIdx, vectors) => + val numberNonZeroPerRow = vectors.map(_._2.activeSize).sum.toDouble / cols.toDouble - val wholeVector = vectors.head match { - case (idx, v: BDV[_]) => BDV.zeros[Double](numCols().toInt) - case (idx, v: BSV[_]) => BSV.zeros[Double](numCols().toInt) - case _ => throw new SparkException(s"Cannot convert an empty vector to an indexed row") + val wholeVector = if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz + BSV.zeros[Double](cols) + } else { + BDV.zeros[Double](cols) } vectors.foreach { case (blockColIdx: Int, vec: BV[Double]) => val offset = colsPerBlock * blockColIdx - wholeVector(offset until offset + vec.size) := vec + wholeVector(offset until offset + colsPerBlock) := vec } new IndexedRow(rowIdx, Vectors.fromBreeze(wholeVector)) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index f737d2c51a262..f37eaf225ab88 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -19,10 +19,10 @@ package org.apache.spark.mllib.linalg.distributed import java.{util => ju} -import breeze.linalg.{DenseMatrix => BDM} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} import org.apache.spark.{SparkException, SparkFunSuite} -import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix} +import org.apache.spark.mllib.linalg.{DenseMatrix, DenseVector, Matrices, Matrix, SparseMatrix, SparseVector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -134,6 +134,33 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { assert(rowMat.numRows() === m) assert(rowMat.numCols() === n) assert(rowMat.toBreeze() === gridBasedMat.toBreeze()) + + val rows = 1 + val cols = 10 + + val matDense = new DenseMatrix(rows, cols, + Array(1.0, 1.0, 3.0, 2.0, 5.0, 6.0, 7.0, 1.0, 2.0, 3.0)) + val matSparse = new SparseMatrix(rows, cols, + Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1), Array(0), Array(1.0)) + + val vectors: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), matDense), + ((1, 0), matSparse)) + + val rdd = sc.parallelize(vectors) + val B = new BlockMatrix(rdd, rows, cols) + + val C = B.toIndexedRowMatrix.rows.collect + + (C(0).vector.toBreeze, C(1).vector.toBreeze) match { + case (denseVector: BDV[Double], sparseVector: BSV[Double]) => + assert(denseVector.length === sparseVector.length) + + assert(matDense.toArray === denseVector.toArray) + assert(matSparse.toArray === sparseVector.toArray) + case _ => + throw new RuntimeException("IndexedRow returns vectors of unexpected type") + } } test("toBreeze and toLocalMatrix") {