From e17af2ed46db5fa71ce015e42c8c6ba3ed4f15bf Mon Sep 17 00:00:00 2001 From: Mike Dusenberry Date: Wed, 29 Jul 2015 16:40:11 -0700 Subject: [PATCH 1/7] Adding BlockMatrix to PySpark. --- .../mllib/api/python/PythonMLLibAPI.scala | 25 ++++ python/pyspark/mllib/linalg/__init__.py | 129 +++++++++++++++++- 2 files changed, 152 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index d2b3fae381acb..f585aacd452e0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1128,6 +1128,21 @@ private[python] class PythonMLLibAPI extends Serializable { new CoordinateMatrix(entries, numRows, numCols) } + /** + * Wrapper around BlockMatrix constructor. + */ + def createBlockMatrix(blocks: DataFrame, rowsPerBlock: Int, colsPerBlock: Int, + numRows: Long, numCols: Long): BlockMatrix = { + // We use DataFrames for serialization of sub-matrix blocks from + // Python, so map each Row in the DataFrame back to a + // ((blockRowIndex, blockColIndex), sub-matrix) tuple. + val blockTuples = blocks.map { + case Row(Row(blockRowIndex: Long, blockColIndex: Long), subMatrix: Matrix) => + ((blockRowIndex.toInt, blockColIndex.toInt), subMatrix) + } + new BlockMatrix(blockTuples, rowsPerBlock, colsPerBlock, numRows, numCols) + } + /** * Return the rows of an IndexedRowMatrix. */ @@ -1147,6 +1162,16 @@ private[python] class PythonMLLibAPI extends Serializable { val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext) sqlContext.createDataFrame(coordinateMatrix.entries) } + + /** + * Return the sub-matrix blocks of a BlockMatrix. + */ + def getMatrixBlocks(blockMatrix: BlockMatrix): DataFrame = { + // We use DataFrames for serialization of sub-matrix blocks to + // Python, so return a DataFrame. + val sqlContext = new SQLContext(blockMatrix.blocks.sparkContext) + sqlContext.createDataFrame(blockMatrix.blocks) + } } /** diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index 334dc8e38bb8f..db3ae96c36acc 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -37,12 +37,15 @@ import numpy as np +from pyspark import RDD +from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \ IntegerType, ByteType, BooleanType __all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors', - 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices'] + 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices', + 'BlockMatrix'] if sys.version_info[:2] == (2, 7): @@ -1152,9 +1155,131 @@ def sparse(numRows, numCols, colPtrs, rowIndices, values): return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values) +def _convert_to_matrix_block_tuple(block): + if isinstance(block, tuple) and len(block) == 2 and len(block[0]) == 2 \ + and isinstance(block[1], Matrix): + blockRowIndex = int(block[0][0]) + blockColIndex = int(block[0][1]) + subMatrix = block[1] + return ((blockRowIndex, blockColIndex), subMatrix) + # return MatrixBlock(blockRowIndex, blockColIndex, subMatrix) + else: + raise TypeError("Cannot convert type %s into a sub-matrix block tuple" % type(block)) + + +class BlockMatrix(object): + """ + .. note:: Experimental + + Represents a distributed matrix in blocks of local matrices. + + :param blocks: The RDD of sub-matrix blocks + ((blockRowIndex, blockColIndex), sub-matrix) that + form this distributed matrix. If multiple blocks + with the same index exist, the results for + operations like add and multiply will be + unpredictable. + :param rowsPerBlock: Number of rows that make up each block. + The blocks forming the final rows are not + required to have the given number of rows. + :param colsPerBlock: Number of columns that make up each block. + The blocks forming the final columns are not + required to have the given number of columns. + :param numRows: Number of rows of this matrix. If the supplied + value is less than or equal to zero, the number + of rows will be calculated when `numRows` is + invoked. + :param numCols: Number of columns of this matrix. If the supplied + value is less than or equal to zero, the number + of columns will be calculated when `numCols` is + invoked. + """ + def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, numCols=0): + """Create a wrapper over a Java BlockMatrix.""" + if not isinstance(blocks, RDD): + raise TypeError("entries should be an RDD of sub-matrix blocks as" + "((int, int), matrix) tuples, got %s" % type(blocks)) + blocks = blocks.map(_convert_to_matrix_block_tuple) + + # We use DataFrames for serialization of sub-matrix blocks + # from Python, so first convert the RDD to a DataFrame on this + # side. We will convert back to + # ((blockRowIndex, blockColIndex), sub-matrix) tuples on the + # Scala side. + javaBlockMatrix = callMLlibFunc("createBlockMatrix", blocks.toDF(), + int(rowsPerBlock), int(colsPerBlock), + long(numRows), long(numCols)) + self._jbm = JavaModelWrapper(javaBlockMatrix) + self.blocks = blocks + + def numRows(self): + """ + Get or compute the number of rows. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + + >>> bm = BlockMatrix(blocks, 3, 2) + >>> print(bm.numRows()) + 6 + + >>> bm = BlockMatrix(blocks, 3, 2, 7, 6) + >>> print(bm.numRows()) + 7 + """ + return self._jbm.call("numRows") + + def numCols(self): + """ + Get or compute the number of cols. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + + >>> bm = BlockMatrix(blocks, 3, 2) + >>> print(bm.numCols()) + 2 + + >>> bm = BlockMatrix(blocks, 3, 2, 7, 6) + >>> print(bm.numCols()) + 6 + """ + return self._jbm.call("numCols") + + def toLocalMatrix(self): + """ + Collect the distributed matrix on the driver as a DenseMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + + >>> # This BlockMatrix will have 6 effective rows, due to + >>> # having two sub-matrix blocks stacked, each with 3 rows. + >>> # The ensuing DenseMatrix will also have 6 rows. + >>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix() + >>> print(mat.numRows) + 6 + + >>> # This BlockMatrix will have 2 effective columns, due to + >>> # having two sub-matrix blocks stacked, each with 2 + >>> # columns. The ensuing DenseMatrix will also have 2 columns. + >>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix() + >>> print(mat.numCols) + 2 + """ + return self._jbm.call("toLocalMatrix") + + def _test(): import doctest - (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) + from pyspark import SparkContext + from pyspark.sql import SQLContext + import pyspark.mllib.linalg + globs = pyspark.mllib.linalg.__dict__.copy() + globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) + globs['sqlContext'] = SQLContext(globs['sc']) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() if failure_count: exit(-1) From 8fb3095909ac09b97490d1c635d3cfb2fb62cc78 Mon Sep 17 00:00:00 2001 From: Mike Dusenberry Date: Wed, 29 Jul 2015 16:46:42 -0700 Subject: [PATCH 2/7] Small cleanup. --- python/pyspark/mllib/linalg/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index db3ae96c36acc..834af89708fb8 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -1162,7 +1162,6 @@ def _convert_to_matrix_block_tuple(block): blockColIndex = int(block[0][1]) subMatrix = block[1] return ((blockRowIndex, blockColIndex), subMatrix) - # return MatrixBlock(blockRowIndex, blockColIndex, subMatrix) else: raise TypeError("Cannot convert type %s into a sub-matrix block tuple" % type(block)) From 3bda6ab6c098da9b5d877391f41bffc4d047b497 Mon Sep 17 00:00:00 2001 From: Mike Dusenberry Date: Wed, 29 Jul 2015 18:06:37 -0700 Subject: [PATCH 3/7] Adding documentation. --- docs/mllib-data-types.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md index 11033bf4f9c37..6bed3f935a0d6 100644 --- a/docs/mllib-data-types.md +++ b/docs/mllib-data-types.md @@ -661,4 +661,33 @@ matA.validate(); BlockMatrix ata = matA.transpose().multiply(matA); {% endhighlight %} + + +
+ +A [`BlockMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.BlockMatrix) can be created +from an `RDD` of sub-matrix blocks, where a sub-matrix block is a +`((blockRowIndex, blockColIndex), sub-matrix)` tuple. + +{% highlight python %} +from pyspark.mllib.linalg import BlockMatrix, Matrices + +# Create an RDD of sub-matrix blocks. +blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + +# Create a BlockMatrix from an RDD of sub-matrix blocks. +mat = BlockMatrix(blocks, 3, 2) + +# Get its size. +m = mat.numRows() # 6 +n = mat.numCols() # 2 + +# Get the blocks as an RDD of sub-matrix blocks. +blocksRDD = mat.blocks + +# Convert to a LocalMatrix. +localMat = mat.toLocalMatrix() +{% endhighlight %} +
From c014002fc8b8f555391057553b87ca2a0522d6c1 Mon Sep 17 00:00:00 2001 From: Mike Dusenberry Date: Wed, 29 Jul 2015 19:01:07 -0700 Subject: [PATCH 4/7] Using properties for better documentation. --- python/pyspark/mllib/linalg/__init__.py | 49 ++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index 834af89708fb8..03e873f89089d 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -1209,7 +1209,54 @@ def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, numCols=0): int(rowsPerBlock), int(colsPerBlock), long(numRows), long(numCols)) self._jbm = JavaModelWrapper(javaBlockMatrix) - self.blocks = blocks + self._blocks = blocks + self._rowsPerBlock = rowsPerBlock + self._colsPerBlock = colsPerBlock + + @property + def blocks(self): + """ + The RDD of sub-matrix blocks + ((blockRowIndex, blockColIndex), sub-matrix) that form this + distributed matrix. + """ + return self._blocks + + @property + def rowsPerBlock(self): + """Number of rows that make up each block.""" + return self._rowsPerBlock + + @property + def colsPerBlock(self): + """Number of columns that make up each block.""" + return self._colsPerBlock + + @property + def numRowBlocks(self): + """ + Number of rows of blocks in the BlockMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> bm = BlockMatrix(blocks, 3, 2) + >>> print(bm.numRowBlocks) + 2 + """ + return self._jbm.call("numRowBlocks") + + @property + def numColBlocks(self): + """ + Number of columns of blocks in the BlockMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> bm = BlockMatrix(blocks, 3, 2) + >>> print(bm.numColBlocks) + 1 + """ + return self._jbm.call("numColBlocks") def numRows(self): """ From b8acc1cfaa0506d94c6d63931f7a382e8610e354 Mon Sep 17 00:00:00 2001 From: Mike Dusenberry Date: Tue, 4 Aug 2015 19:24:24 -0700 Subject: [PATCH 5/7] Moving BlockMatrix to pyspark.mllib.linalg.distributed, updating the logic to match that of the other distributed matrices, adding conversions, and adding documentation. --- docs/mllib-data-types.md | 20 +- python/pyspark/mllib/linalg/__init__.py | 175 +---------- python/pyspark/mllib/linalg/distributed.py | 329 ++++++++++++++++++++- 3 files changed, 341 insertions(+), 183 deletions(-) diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md index 6bed3f935a0d6..f0e8d5495675d 100644 --- a/docs/mllib-data-types.md +++ b/docs/mllib-data-types.md @@ -494,6 +494,9 @@ rowMat = mat.toRowMatrix() # Convert to a CoordinateMatrix. coordinateMat = mat.toCoordinateMatrix() + +# Convert to a BlockMatrix. +blockMat = mat.toBlockMatrix() {% endhighlight %} @@ -594,6 +597,9 @@ rowMat = mat.toRowMatrix() # Convert to an IndexedRowMatrix. indexedRowMat = mat.toIndexedRowMatrix() + +# Convert to a BlockMatrix. +blockMat = mat.toBlockMatrix() {% endhighlight %} @@ -662,15 +668,15 @@ BlockMatrix ata = matA.transpose().multiply(matA); {% endhighlight %} -
-A [`BlockMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.BlockMatrix) can be created -from an `RDD` of sub-matrix blocks, where a sub-matrix block is a +A [`BlockMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.BlockMatrix) +can be created from an `RDD` of sub-matrix blocks, where a sub-matrix block is a `((blockRowIndex, blockColIndex), sub-matrix)` tuple. {% highlight python %} -from pyspark.mllib.linalg import BlockMatrix, Matrices +from pyspark.mllib.linalg import Matrices +from pyspark.mllib.linalg.distributed import BlockMatrix # Create an RDD of sub-matrix blocks. blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), @@ -688,6 +694,12 @@ blocksRDD = mat.blocks # Convert to a LocalMatrix. localMat = mat.toLocalMatrix() + +# Convert to an IndexedRowMatrix. +indexedRowMat = mat.toIndexedRowMatrix() + +# Convert to a CoordinateMatrix. +coordinateMat = mat.toCoordinateMatrix() {% endhighlight %}
diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index 03e873f89089d..334dc8e38bb8f 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -37,15 +37,12 @@ import numpy as np -from pyspark import RDD -from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \ IntegerType, ByteType, BooleanType __all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors', - 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices', - 'BlockMatrix'] + 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices'] if sys.version_info[:2] == (2, 7): @@ -1155,177 +1152,9 @@ def sparse(numRows, numCols, colPtrs, rowIndices, values): return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values) -def _convert_to_matrix_block_tuple(block): - if isinstance(block, tuple) and len(block) == 2 and len(block[0]) == 2 \ - and isinstance(block[1], Matrix): - blockRowIndex = int(block[0][0]) - blockColIndex = int(block[0][1]) - subMatrix = block[1] - return ((blockRowIndex, blockColIndex), subMatrix) - else: - raise TypeError("Cannot convert type %s into a sub-matrix block tuple" % type(block)) - - -class BlockMatrix(object): - """ - .. note:: Experimental - - Represents a distributed matrix in blocks of local matrices. - - :param blocks: The RDD of sub-matrix blocks - ((blockRowIndex, blockColIndex), sub-matrix) that - form this distributed matrix. If multiple blocks - with the same index exist, the results for - operations like add and multiply will be - unpredictable. - :param rowsPerBlock: Number of rows that make up each block. - The blocks forming the final rows are not - required to have the given number of rows. - :param colsPerBlock: Number of columns that make up each block. - The blocks forming the final columns are not - required to have the given number of columns. - :param numRows: Number of rows of this matrix. If the supplied - value is less than or equal to zero, the number - of rows will be calculated when `numRows` is - invoked. - :param numCols: Number of columns of this matrix. If the supplied - value is less than or equal to zero, the number - of columns will be calculated when `numCols` is - invoked. - """ - def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, numCols=0): - """Create a wrapper over a Java BlockMatrix.""" - if not isinstance(blocks, RDD): - raise TypeError("entries should be an RDD of sub-matrix blocks as" - "((int, int), matrix) tuples, got %s" % type(blocks)) - blocks = blocks.map(_convert_to_matrix_block_tuple) - - # We use DataFrames for serialization of sub-matrix blocks - # from Python, so first convert the RDD to a DataFrame on this - # side. We will convert back to - # ((blockRowIndex, blockColIndex), sub-matrix) tuples on the - # Scala side. - javaBlockMatrix = callMLlibFunc("createBlockMatrix", blocks.toDF(), - int(rowsPerBlock), int(colsPerBlock), - long(numRows), long(numCols)) - self._jbm = JavaModelWrapper(javaBlockMatrix) - self._blocks = blocks - self._rowsPerBlock = rowsPerBlock - self._colsPerBlock = colsPerBlock - - @property - def blocks(self): - """ - The RDD of sub-matrix blocks - ((blockRowIndex, blockColIndex), sub-matrix) that form this - distributed matrix. - """ - return self._blocks - - @property - def rowsPerBlock(self): - """Number of rows that make up each block.""" - return self._rowsPerBlock - - @property - def colsPerBlock(self): - """Number of columns that make up each block.""" - return self._colsPerBlock - - @property - def numRowBlocks(self): - """ - Number of rows of blocks in the BlockMatrix. - - >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), - ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) - >>> bm = BlockMatrix(blocks, 3, 2) - >>> print(bm.numRowBlocks) - 2 - """ - return self._jbm.call("numRowBlocks") - - @property - def numColBlocks(self): - """ - Number of columns of blocks in the BlockMatrix. - - >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), - ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) - >>> bm = BlockMatrix(blocks, 3, 2) - >>> print(bm.numColBlocks) - 1 - """ - return self._jbm.call("numColBlocks") - - def numRows(self): - """ - Get or compute the number of rows. - - >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), - ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) - - >>> bm = BlockMatrix(blocks, 3, 2) - >>> print(bm.numRows()) - 6 - - >>> bm = BlockMatrix(blocks, 3, 2, 7, 6) - >>> print(bm.numRows()) - 7 - """ - return self._jbm.call("numRows") - - def numCols(self): - """ - Get or compute the number of cols. - - >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), - ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) - - >>> bm = BlockMatrix(blocks, 3, 2) - >>> print(bm.numCols()) - 2 - - >>> bm = BlockMatrix(blocks, 3, 2, 7, 6) - >>> print(bm.numCols()) - 6 - """ - return self._jbm.call("numCols") - - def toLocalMatrix(self): - """ - Collect the distributed matrix on the driver as a DenseMatrix. - - >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), - ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) - - >>> # This BlockMatrix will have 6 effective rows, due to - >>> # having two sub-matrix blocks stacked, each with 3 rows. - >>> # The ensuing DenseMatrix will also have 6 rows. - >>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix() - >>> print(mat.numRows) - 6 - - >>> # This BlockMatrix will have 2 effective columns, due to - >>> # having two sub-matrix blocks stacked, each with 2 - >>> # columns. The ensuing DenseMatrix will also have 2 columns. - >>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix() - >>> print(mat.numCols) - 2 - """ - return self._jbm.call("toLocalMatrix") - - def _test(): import doctest - from pyspark import SparkContext - from pyspark.sql import SQLContext - import pyspark.mllib.linalg - globs = pyspark.mllib.linalg.__dict__.copy() - globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) - globs['sqlContext'] = SQLContext(globs['sc']) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) - globs['sc'].stop() + (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) if failure_count: exit(-1) diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py index 666d833019562..a53396983ad6f 100644 --- a/python/pyspark/mllib/linalg/distributed.py +++ b/python/pyspark/mllib/linalg/distributed.py @@ -28,11 +28,12 @@ from pyspark import RDD from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper -from pyspark.mllib.linalg import _convert_to_vector +from pyspark.mllib.linalg import _convert_to_vector, Matrix __all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow', - 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix'] + 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix', + 'BlockMatrix'] class DistributedMatrix(object): @@ -322,6 +323,37 @@ def toCoordinateMatrix(self): java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix") return CoordinateMatrix(java_coordinate_matrix) + def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024): + """ + Convert this matrix to BlockMatrix. + + :param rowsPerBlock: Number of rows that make up each block. + The blocks forming the final rows are not + required to have the given number of rows. + :param colsPerBlock: Number of columns that make up each block. + The blocks forming the final columns are not + required to have the given number of columns. + + >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), + ... IndexedRow(6, [4, 5, 6])]) + >>> mat = IndexedRowMatrix(rows).toBlockMatrix() + + >>> # This IndexedRowMatrix will have 7 effective rows, due to + >>> # the highest row index being 6, and the ensuing + >>> # BlockMatrix will have 7 rows as well. + >>> print(mat.numRows()) + 7 + + >>> # This IndexedRowMatrix will have 3 columns, and the + >>> # ensuing BlockMatrix will have 3 columns as well. + >>> print(mat.numCols()) + 3 + """ + java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix", + rowsPerBlock, + colsPerBlock) + return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock) + class MatrixEntry(object): """ @@ -476,19 +508,18 @@ def toRowMatrix(self): >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(6, 4, 2.1)]) + >>> mat = CoordinateMatrix(entries).toRowMatrix() >>> # This CoordinateMatrix will have 7 effective rows, due to >>> # the highest row index being 6, but the ensuing RowMatrix >>> # will only have 2 rows since there are only entries on 2 >>> # unique rows. - >>> mat = CoordinateMatrix(entries).toRowMatrix() >>> print(mat.numRows()) 2 >>> # This CoordinateMatrix will have 5 columns, due to the >>> # highest column index being 4, and the ensuing RowMatrix >>> # will have 5 columns as well. - >>> mat = CoordinateMatrix(entries).toRowMatrix() >>> print(mat.numCols()) 5 """ @@ -501,33 +532,319 @@ def toIndexedRowMatrix(self): >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(6, 4, 2.1)]) + >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix() >>> # This CoordinateMatrix will have 7 effective rows, due to >>> # the highest row index being 6, and the ensuing >>> # IndexedRowMatrix will have 7 rows as well. - >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix() >>> print(mat.numRows()) 7 >>> # This CoordinateMatrix will have 5 columns, due to the >>> # highest column index being 4, and the ensuing >>> # IndexedRowMatrix will have 5 columns as well. - >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix() >>> print(mat.numCols()) 5 """ java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix") return IndexedRowMatrix(java_indexed_row_matrix) + def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024): + """ + Convert this matrix to BlockMatrix. + + :param rowsPerBlock: Number of rows that make up each block. + The blocks forming the final rows are not + required to have the given number of rows. + :param colsPerBlock: Number of columns that make up each block. + The blocks forming the final columns are not + required to have the given number of columns. + + >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), + ... MatrixEntry(6, 4, 2.1)]) + >>> mat = CoordinateMatrix(entries).toBlockMatrix() + + >>> # This CoordinateMatrix will have 7 effective rows, due to + >>> # the highest row index being 6, and the ensuing + >>> # BlockMatrix will have 7 rows as well. + >>> print(mat.numRows()) + 7 + + >>> # This CoordinateMatrix will have 5 columns, due to the + >>> # highest column index being 4, and the ensuing + >>> # BlockMatrix will have 5 columns as well. + >>> print(mat.numCols()) + 5 + """ + java_block_matrix = self._java_matrix_wrapper.call("toBlockMatrix", + rowsPerBlock, + colsPerBlock) + return BlockMatrix(java_block_matrix, rowsPerBlock, colsPerBlock) + + +def _convert_to_matrix_block_tuple(block): + if isinstance(block, tuple) and len(block) == 2 and len(block[0]) == 2 \ + and isinstance(block[1], Matrix): + blockRowIndex = int(block[0][0]) + blockColIndex = int(block[0][1]) + subMatrix = block[1] + return ((blockRowIndex, blockColIndex), subMatrix) + else: + raise TypeError("Cannot convert type %s into a sub-matrix block tuple" % type(block)) + + +class BlockMatrix(object): + """ + .. note:: Experimental + + Represents a distributed matrix in blocks of local matrices. + + :param blocks: An RDD of sub-matrix blocks + ((blockRowIndex, blockColIndex), sub-matrix) that + form this distributed matrix. If multiple blocks + with the same index exist, the results for + operations like add and multiply will be + unpredictable. + :param rowsPerBlock: Number of rows that make up each block. + The blocks forming the final rows are not + required to have the given number of rows. + :param colsPerBlock: Number of columns that make up each block. + The blocks forming the final columns are not + required to have the given number of columns. + :param numRows: Number of rows of this matrix. If the supplied + value is less than or equal to zero, the number + of rows will be calculated when `numRows` is + invoked. + :param numCols: Number of columns of this matrix. If the supplied + value is less than or equal to zero, the number + of columns will be calculated when `numCols` is + invoked. + """ + def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, numCols=0): + """ + Note: This docstring is not shown publicly. + + Create a wrapper over a Java BlockMatrix. + + Publicly, we require that `blocks` be an RDD. However, for + internal usage, `blocks` can also be a Java BlockMatrix + object, in which case we can wrap it directly. This + assists in clean matrix conversions. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + + >>> mat_diff = BlockMatrix(blocks, 3, 2) + >>> (mat_diff._java_matrix_wrapper._java_model == + ... mat._java_matrix_wrapper._java_model) + False + + >>> mat_same = BlockMatrix(mat._java_matrix_wrapper._java_model, 3, 2) + >>> (mat_same._java_matrix_wrapper._java_model == + ... mat._java_matrix_wrapper._java_model) + True + """ + if isinstance(blocks, RDD): + blocks = blocks.map(_convert_to_matrix_block_tuple) + # We use DataFrames for serialization of sub-matrix blocks + # from Python, so first convert the RDD to a DataFrame on + # this side. This will convert each sub-matrix block + # tuple to a Row containing the 'blockRowIndex', + # 'blockColIndex', and 'subMatrix' values, which can + # each be easily serialized. We will convert back to + # ((blockRowIndex, blockColIndex), sub-matrix) tuples on + # the Scala side. + java_matrix = callMLlibFunc("createBlockMatrix", blocks.toDF(), + int(rowsPerBlock), int(colsPerBlock), + long(numRows), long(numCols)) + elif (isinstance(blocks, JavaObject) + and blocks.getClass().getSimpleName() == "BlockMatrix"): + java_matrix = blocks + else: + raise TypeError("blocks should be an RDD of sub-matrix blocks as" + "((int, int), matrix) tuples, got %s" % type(blocks)) + + self._java_matrix_wrapper = JavaModelWrapper(java_matrix) + + @property + def blocks(self): + """ + The RDD of sub-matrix blocks + ((blockRowIndex, blockColIndex), sub-matrix) that form this + distributed matrix. + + >>> mat = BlockMatrix( + ... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2) + >>> blocks = mat.blocks + >>> blocks.first() + ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0)) + + """ + # We use DataFrames for serialization of sub-matrix blocks + # from Java, so we first convert the RDD of blocks to a + # DataFrame on the Scala/Java side. Then we map each Row in + # the DataFrame back to a sub-matrix block on this side. + blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model) + blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1])) + return blocks + + @property + def rowsPerBlock(self): + """ + Number of rows that make up each block. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + >>> mat.rowsPerBlock + 3 + """ + return self._java_matrix_wrapper.call("rowsPerBlock") + + @property + def colsPerBlock(self): + """ + Number of columns that make up each block. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + >>> mat.colsPerBlock + 2 + """ + return self._java_matrix_wrapper.call("colsPerBlock") + + @property + def numRowBlocks(self): + """ + Number of rows of blocks in the BlockMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + >>> mat.numRowBlocks + 2 + """ + return self._java_matrix_wrapper.call("numRowBlocks") + + @property + def numColBlocks(self): + """ + Number of columns of blocks in the BlockMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2) + >>> mat.numColBlocks + 1 + """ + return self._java_matrix_wrapper.call("numColBlocks") + + def numRows(self): + """ + Get or compute the number of rows. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + + >>> mat = BlockMatrix(blocks, 3, 2) + >>> print(mat.numRows()) + 6 + + >>> mat = BlockMatrix(blocks, 3, 2, 7, 6) + >>> print(mat.numRows()) + 7 + """ + return self._java_matrix_wrapper.call("numRows") + + def numCols(self): + """ + Get or compute the number of cols. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + + >>> mat = BlockMatrix(blocks, 3, 2) + >>> print(mat.numCols()) + 2 + + >>> mat = BlockMatrix(blocks, 3, 2, 7, 6) + >>> print(mat.numCols()) + 6 + """ + return self._java_matrix_wrapper.call("numCols") + + def toLocalMatrix(self): + """ + Collect the distributed matrix on the driver as a DenseMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix() + + >>> # This BlockMatrix will have 6 effective rows, due to + >>> # having two sub-matrix blocks stacked, each with 3 rows. + >>> # The ensuing DenseMatrix will also have 6 rows. + >>> print(mat.numRows) + 6 + + >>> # This BlockMatrix will have 2 effective columns, due to + >>> # having two sub-matrix blocks stacked, each with 2 + >>> # columns. The ensuing DenseMatrix will also have 2 columns. + >>> print(mat.numCols) + 2 + """ + return self._java_matrix_wrapper.call("toLocalMatrix") + + def toIndexedRowMatrix(self): + """ + Convert this matrix to an IndexedRowMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), + ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) + >>> mat = BlockMatrix(blocks, 3, 2).toIndexedRowMatrix() + + >>> # This BlockMatrix will have 6 effective rows, due to + >>> # having two sub-matrix blocks stacked, each with 3 rows. + >>> # The ensuing IndexedRowMatrix will also have 6 rows. + >>> print(mat.numRows()) + 6 + + >>> # This BlockMatrix will have 2 effective columns, due to + >>> # having two sub-matrix blocks stacked, each with 2 columns. + >>> # The ensuing IndexedRowMatrix will also have 2 columns. + >>> print(mat.numCols()) + 2 + """ + java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix") + return IndexedRowMatrix(java_indexed_row_matrix) + + def toCoordinateMatrix(self): + """ + Convert this matrix to a CoordinateMatrix. + + >>> blocks = sc.parallelize([((0, 0), Matrices.dense(1, 2, [1, 2])), + ... ((1, 0), Matrices.dense(1, 2, [7, 8]))]) + >>> mat = BlockMatrix(blocks, 1, 2).toCoordinateMatrix() + >>> mat.entries.take(3) + [MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 2.0), MatrixEntry(1, 0, 7.0)] + """ + java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix") + return CoordinateMatrix(java_coordinate_matrix) + def _test(): import doctest from pyspark import SparkContext from pyspark.sql import SQLContext + from pyspark.mllib.linalg import Matrices import pyspark.mllib.linalg.distributed globs = pyspark.mllib.linalg.distributed.__dict__.copy() globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) globs['sqlContext'] = SQLContext(globs['sc']) + globs['Matrices'] = Matrices (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: From ae5088307c4ce822aa813c9b83ce707224eab46d Mon Sep 17 00:00:00 2001 From: Mike Dusenberry Date: Tue, 4 Aug 2015 23:09:14 -0700 Subject: [PATCH 6/7] Minor update: BlockMatrix should inherit from DistributedMatrix. --- python/pyspark/mllib/linalg/distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py index a53396983ad6f..e5a5940905dac 100644 --- a/python/pyspark/mllib/linalg/distributed.py +++ b/python/pyspark/mllib/linalg/distributed.py @@ -593,7 +593,7 @@ def _convert_to_matrix_block_tuple(block): raise TypeError("Cannot convert type %s into a sub-matrix block tuple" % type(block)) -class BlockMatrix(object): +class BlockMatrix(DistributedMatrix): """ .. note:: Experimental From 27195c236b51d862039905522e317ebc6dc75d7d Mon Sep 17 00:00:00 2001 From: Mike Dusenberry Date: Tue, 4 Aug 2015 23:45:02 -0700 Subject: [PATCH 7/7] Adding one more check to _convert_to_matrix_block_tuple, and a few minor documentation changes. --- python/pyspark/mllib/linalg/distributed.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py index e5a5940905dac..aec407de90aa3 100644 --- a/python/pyspark/mllib/linalg/distributed.py +++ b/python/pyspark/mllib/linalg/distributed.py @@ -325,7 +325,7 @@ def toCoordinateMatrix(self): def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024): """ - Convert this matrix to BlockMatrix. + Convert this matrix to a BlockMatrix. :param rowsPerBlock: Number of rows that make up each block. The blocks forming the final rows are not @@ -344,8 +344,6 @@ def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024): >>> print(mat.numRows()) 7 - >>> # This IndexedRowMatrix will have 3 columns, and the - >>> # ensuing BlockMatrix will have 3 columns as well. >>> print(mat.numCols()) 3 """ @@ -551,7 +549,7 @@ def toIndexedRowMatrix(self): def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024): """ - Convert this matrix to BlockMatrix. + Convert this matrix to a BlockMatrix. :param rowsPerBlock: Number of rows that make up each block. The blocks forming the final rows are not @@ -583,8 +581,9 @@ def toBlockMatrix(self, rowsPerBlock=1024, colsPerBlock=1024): def _convert_to_matrix_block_tuple(block): - if isinstance(block, tuple) and len(block) == 2 and len(block[0]) == 2 \ - and isinstance(block[1], Matrix): + if (isinstance(block, tuple) and len(block) == 2 + and isinstance(block[0], tuple) and len(block[0]) == 2 + and isinstance(block[1], Matrix)): blockRowIndex = int(block[0][0]) blockColIndex = int(block[0][1]) subMatrix = block[1] @@ -662,7 +661,7 @@ def __init__(self, blocks, rowsPerBlock, colsPerBlock, numRows=0, numCols=0): and blocks.getClass().getSimpleName() == "BlockMatrix"): java_matrix = blocks else: - raise TypeError("blocks should be an RDD of sub-matrix blocks as" + raise TypeError("blocks should be an RDD of sub-matrix blocks as " "((int, int), matrix) tuples, got %s" % type(blocks)) self._java_matrix_wrapper = JavaModelWrapper(java_matrix)