From 3f75ab2f69667009afc55a9c985c6fa84e8ba04f Mon Sep 17 00:00:00 2001 From: harupy <17039389+harupy@users.noreply.github.com> Date: Tue, 20 Apr 2021 11:30:08 +0900 Subject: [PATCH 1/8] specify return type for rawPredictionUDF --- python/pyspark/ml/classification.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 0553a61c6c771..17994ed5e3d28 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -40,7 +40,7 @@ from pyspark.ml.wrapper import JavaParams, \ JavaPredictor, JavaPredictionModel, JavaWrapper from pyspark.ml.common import inherit_doc -from pyspark.ml.linalg import Vectors +from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql import DataFrame from pyspark.sql.functions import udf, when from pyspark.sql.types import ArrayType, DoubleType @@ -3151,7 +3151,7 @@ def func(predictions): predArray.append(x) return Vectors.dense(predArray) - rawPredictionUDF = udf(func) + rawPredictionUDF = udf(func, VectorUDT()) aggregatedDataset = aggregatedDataset.withColumn( self.getRawPredictionCol(), rawPredictionUDF(aggregatedDataset[accColName])) From 383c84fae8430c34d4c7d245e68af0df685a9fff Mon Sep 17 00:00:00 2001 From: harupy <17039389+harupy@users.noreply.github.com> Date: Tue, 20 Apr 2021 12:24:04 +0900 Subject: [PATCH 2/8] Add test --- python/pyspark/ml/tests/test_algorithms.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index 50475210607c8..f5069e2149390 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -115,6 +115,7 @@ def test_output_columns(self): model = ovr.fit(df) output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"]) + self.assertIsInstance(pred.schema["rawPrediction"].dataType, VectorUDT) def test_parallelism_does_not_change_output(self): df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), From 5e05b5053dc9c84f4ae10b8804e07e2041a2c321 Mon Sep 17 00:00:00 2001 From: harupy <17039389+harupy@users.noreply.github.com> Date: Tue, 20 Apr 2021 12:24:27 +0900 Subject: [PATCH 3/8] Fix incorrect variable name --- python/pyspark/ml/tests/test_algorithms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index f5069e2149390..bc8f8dffc8139 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -115,7 +115,7 @@ def test_output_columns(self): model = ovr.fit(df) output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"]) - self.assertIsInstance(pred.schema["rawPrediction"].dataType, VectorUDT) + self.assertIsInstance(output.schema["rawPrediction"].dataType, VectorUDT) def test_parallelism_does_not_change_output(self): df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), From 3c2ac9521d7e4ce60a06a0291b9abe466908340c Mon Sep 17 00:00:00 2001 From: harupy <17039389+harupy@users.noreply.github.com> Date: Tue, 20 Apr 2021 14:34:18 +0900 Subject: [PATCH 4/8] import VectorUDT --- python/pyspark/ml/tests/test_algorithms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index bc8f8dffc8139..d21c556a17cef 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -25,7 +25,7 @@ MultilayerPerceptronClassifier, OneVsRest from pyspark.ml.clustering import DistributedLDAModel, KMeans, LocalLDAModel, LDA, LDAModel from pyspark.ml.fpm import FPGrowth -from pyspark.ml.linalg import Matrices, Vectors +from pyspark.ml.linalg import Matrices, Vectors, VectorUDT from pyspark.ml.recommendation import ALS from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression from pyspark.sql import Row From 98d241eab5c27d5f592c49d8c7b119a3ee4ea8cd Mon Sep 17 00:00:00 2001 From: harupy <17039389+harupy@users.noreply.github.com> Date: Wed, 21 Apr 2021 10:24:56 +0900 Subject: [PATCH 5/8] Create a separate test --- python/pyspark/ml/tests/test_algorithms.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index d21c556a17cef..e375314797877 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -115,6 +115,18 @@ def test_output_columns(self): model = ovr.fit(df) output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"]) + + def test_SPARK_35142(self): + # https://issues.apache.org/jira/browse/SPARK-35142 + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], + ["label", "features"]) + lr = LogisticRegression(maxIter=5, regParam=0.01) + ovr = OneVsRest(classifier=lr, parallelism=1) + model = ovr.fit(df) + output = model.transform(df).head() + self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"]) self.assertIsInstance(output.schema["rawPrediction"].dataType, VectorUDT) def test_parallelism_does_not_change_output(self): From 2f12765ad7f89c772113231777e8f638550bbf7e Mon Sep 17 00:00:00 2001 From: harupy <17039389+harupy@users.noreply.github.com> Date: Wed, 21 Apr 2021 10:35:04 +0900 Subject: [PATCH 6/8] rename test --- python/pyspark/ml/tests/test_algorithms.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index e375314797877..2ef4e79cc7e13 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -116,8 +116,8 @@ def test_output_columns(self): output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"]) - def test_SPARK_35142(self): - # https://issues.apache.org/jira/browse/SPARK-35142 + def test_raw_prediction_column_is_of_vector_type(self): + # SPARK-35142 df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), (1.0, Vectors.sparse(2, [], [])), (2.0, Vectors.dense(0.5, 0.5))], From b6fabb3eec661805f2a89eb839d01f7d5625e0f8 Mon Sep 17 00:00:00 2001 From: harupy <17039389+harupy@users.noreply.github.com> Date: Wed, 21 Apr 2021 10:43:15 +0900 Subject: [PATCH 7/8] add comment --- python/pyspark/ml/tests/test_algorithms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index 2ef4e79cc7e13..21a1282a1e079 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -117,7 +117,7 @@ def test_output_columns(self): self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"]) def test_raw_prediction_column_is_of_vector_type(self): - # SPARK-35142 + # SPARK-35142: `OneVsRestModel` outputs raw prediction as a string column df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), (1.0, Vectors.sparse(2, [], [])), (2.0, Vectors.dense(0.5, 0.5))], From ed26d2cef4d321b0c5fee7a2a851f9535beb12c9 Mon Sep 17 00:00:00 2001 From: harupy <17039389+harupy@users.noreply.github.com> Date: Wed, 21 Apr 2021 12:49:31 +0900 Subject: [PATCH 8/8] Fix test failure --- python/pyspark/ml/tests/test_algorithms.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index 21a1282a1e079..35ce48b926663 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -25,7 +25,7 @@ MultilayerPerceptronClassifier, OneVsRest from pyspark.ml.clustering import DistributedLDAModel, KMeans, LocalLDAModel, LDA, LDAModel from pyspark.ml.fpm import FPGrowth -from pyspark.ml.linalg import Matrices, Vectors, VectorUDT +from pyspark.ml.linalg import Matrices, Vectors, DenseVector from pyspark.ml.recommendation import ALS from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression from pyspark.sql import Row @@ -125,9 +125,8 @@ def test_raw_prediction_column_is_of_vector_type(self): lr = LogisticRegression(maxIter=5, regParam=0.01) ovr = OneVsRest(classifier=lr, parallelism=1) model = ovr.fit(df) - output = model.transform(df).head() - self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"]) - self.assertIsInstance(output.schema["rawPrediction"].dataType, VectorUDT) + row = model.transform(df).head() + self.assertIsInstance(row["rawPrediction"], DenseVector) def test_parallelism_does_not_change_output(self): df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),