From 31226b4b8e5aa5fc016f61ec86c42683c452a696 Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Thu, 26 Apr 2018 10:46:49 -0700 Subject: [PATCH 1/6] add Array input support for BisectingKMeans --- .../spark/ml/clustering/BisectingKMeans.scala | 35 ++++++++++++++----- .../apache/spark/ml/clustering/KMeans.scala | 1 - 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index addc12ac52ec1..b36e5a1eb7423 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -32,8 +32,8 @@ import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} /** @@ -69,13 +69,24 @@ private[clustering] trait BisectingKMeansParams extends Params with HasMaxIter @Since("2.0.0") def getMinDivisibleClusterSize: Double = $(minDivisibleClusterSize) + /** + * Validates the input schema. + * @param schema input schema + */ + private[clustering] def validateSchema(schema: StructType): Unit = { + val typeCandidates = List( new VectorUDT, + new ArrayType(DoubleType, false), + new ArrayType(FloatType, false)) + + SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) + } /** * Validates and transforms the input schema. * @param schema input schema * @return output schema */ protected def validateAndTransformSchema(schema: StructType): StructType = { - SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) + validateSchema(schema) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) } } @@ -113,7 +124,8 @@ class BisectingKMeansModel private[ml] ( override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) val predictUDF = udf((vector: Vector) => predict(vector)) - dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) + dataset.withColumn($(predictionCol), + predictUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol))) } @Since("2.0.0") @@ -121,6 +133,8 @@ class BisectingKMeansModel private[ml] ( validateAndTransformSchema(schema) } + + private[clustering] def predict(features: Vector): Int = parentModel.predict(features) @Since("2.0.0") @@ -132,9 +146,12 @@ class BisectingKMeansModel private[ml] ( */ @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { - SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) - val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point } - parentModel.computeCost(data.map(OldVectors.fromML)) + validateSchema(dataset.schema) + val data: RDD[OldVector] = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol)) + .rdd.map { + case Row(point: Vector) => OldVectors.fromML(point) + } + parentModel.computeCost(data) } @Since("2.0.0") @@ -260,7 +277,9 @@ class BisectingKMeans @Since("2.0.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): BisectingKMeansModel = { transformSchema(dataset.schema, logging = true) - val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { + val rdd: RDD[OldVector] = dataset.select( + DatasetUtils.columnToVector(dataset, getFeaturesCol)) + .rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index de61c9c089a36..b04f1c4baf637 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -161,7 +161,6 @@ class KMeansModel private[ml] ( @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { validateSchema(dataset.schema) - val data: RDD[OldVector] = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol)) .rdd.map { case Row(point: Vector) => OldVectors.fromML(point) From 45e6e96e974607ed0526401d0fdbb4f1c8161dd6 Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Mon, 30 Apr 2018 10:14:41 -0700 Subject: [PATCH 2/6] add support of array input for all clustering methods --- .../spark/ml/clustering/BisectingKMeans.scala | 7 +-- .../spark/ml/clustering/GaussianMixture.scala | 26 ++++++++--- .../apache/spark/ml/clustering/KMeans.scala | 5 +-- .../org/apache/spark/ml/clustering/LDA.scala | 21 +++++++-- .../ml/clustering/BisectingKMeansSuite.scala | 36 ++++++++++++++++ .../ml/clustering/GaussianMixtureSuite.scala | 38 ++++++++++++++++ .../spark/ml/clustering/KMeansSuite.scala | 4 +- .../apache/spark/ml/clustering/LDASuite.scala | 43 ++++++++++++++++++- 8 files changed, 159 insertions(+), 21 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index b36e5a1eb7423..514c58c0ea733 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -133,8 +133,6 @@ class BisectingKMeansModel private[ml] ( validateAndTransformSchema(schema) } - - private[clustering] def predict(features: Vector): Int = parentModel.predict(features) @Since("2.0.0") @@ -277,9 +275,8 @@ class BisectingKMeans @Since("2.0.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): BisectingKMeansModel = { transformSchema(dataset.schema, logging = true) - val rdd: RDD[OldVector] = dataset.select( - DatasetUtils.columnToVector(dataset, getFeaturesCol)) - .rdd.map { + val rdd: RDD[OldVector] = dataset + .select(DatasetUtils.columnToVector(dataset, getFeaturesCol)).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index b5804900c0358..de67964ead937 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -33,8 +33,8 @@ import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatr Vector => OldVector, Vectors => OldVectors} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} /** @@ -56,6 +56,18 @@ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter w @Since("2.0.0") def getK: Int = $(k) + /** + * Validates the input schema. + * @param schema input schema + */ + private[clustering] def validateSchema(schema: StructType): Unit = { + val typeCandidates = List( new VectorUDT, + new ArrayType(DoubleType, false), + new ArrayType(FloatType, false)) + + SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) + } + /** * Validates and transforms the input schema. * @@ -63,7 +75,7 @@ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter w * @return output schema */ protected def validateAndTransformSchema(schema: StructType): StructType = { - SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) + validateSchema(schema) val schemaWithPredictionCol = SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) SchemaUtils.appendColumn(schemaWithPredictionCol, $(probabilityCol), new VectorUDT) } @@ -109,8 +121,9 @@ class GaussianMixtureModel private[ml] ( transformSchema(dataset.schema, logging = true) val predUDF = udf((vector: Vector) => predict(vector)) val probUDF = udf((vector: Vector) => predictProbability(vector)) - dataset.withColumn($(predictionCol), predUDF(col($(featuresCol)))) - .withColumn($(probabilityCol), probUDF(col($(featuresCol)))) + dataset + .withColumn($(predictionCol), predUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol))) + .withColumn($(probabilityCol), probUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol))) } @Since("2.0.0") @@ -340,7 +353,8 @@ class GaussianMixture @Since("2.0.0") ( val sc = dataset.sparkSession.sparkContext val numClusters = $(k) - val instances: RDD[Vector] = dataset.select(col($(featuresCol))).rdd.map { + val instances: RDD[Vector] = dataset + .select(DatasetUtils.columnToVector(dataset, getFeaturesCol)).rdd.map { case Row(features: Vector) => features }.cache() diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index b04f1c4baf637..e66b6315a2753 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -350,9 +350,8 @@ class KMeans @Since("1.5.0") ( transformSchema(dataset.schema, logging = true) val handlePersistence = dataset.storageLevel == StorageLevel.NONE - val instances: RDD[OldVector] = dataset.select( - DatasetUtils.columnToVector(dataset, getFeaturesCol)) - .rdd.map { + val instances: RDD[OldVector] = dataset + .select(DatasetUtils.columnToVector(dataset, getFeaturesCol)).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 47077230fac0a..07d65b8b6ad26 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -43,7 +43,7 @@ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, StructType} import org.apache.spark.util.PeriodicCheckpointer import org.apache.spark.util.VersionUtils @@ -311,6 +311,18 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("2.0.0") def getKeepLastCheckpoint: Boolean = $(keepLastCheckpoint) + /** + * Validates the input schema. + * @param schema input schema + */ + private[clustering] def validateSchema(schema: StructType): Unit = { + val typeCandidates = List( new VectorUDT, + new ArrayType(DoubleType, false), + new ArrayType(FloatType, false)) + + SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) + } + /** * Validates and transforms the input schema. * @@ -345,7 +357,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM s" must be >= 1. Found value: $getTopicConcentration") } } - SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) + validateSchema(schema) SchemaUtils.appendColumn(schema, $(topicDistributionCol), new VectorUDT) } @@ -461,7 +473,8 @@ abstract class LDAModel private[ml] ( val transformer = oldLocalModel.getTopicDistributionMethod val t = udf { (v: Vector) => transformer(OldVectors.fromML(v)).asML } - dataset.withColumn($(topicDistributionCol), t(col($(featuresCol)))).toDF() + dataset.withColumn($(topicDistributionCol), + t(DatasetUtils.columnToVector(dataset, getFeaturesCol))).toDF() } else { logWarning("LDAModel.transform was called without any output columns. Set an output column" + " such as topicDistributionCol to produce results.") @@ -938,7 +951,7 @@ object LDA extends MLReadable[LDA] { featuresCol: String): RDD[(Long, OldVector)] = { dataset .withColumn("docId", monotonically_increasing_id()) - .select("docId", featuresCol) + .select(col("docId"), DatasetUtils.columnToVector(dataset, featuresCol)) .rdd .map { case Row(docId: Long, features: Vector) => (docId, OldVectors.fromML(features)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index 02880f96ae6d9..c102b40f3f226 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -24,6 +24,8 @@ import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.mllib.clustering.DistanceMeasure import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.Dataset +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} class BisectingKMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { @@ -182,6 +184,40 @@ class BisectingKMeansSuite model.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) } + + test("BisectingKMeans with Array input") { + val featuresColNameD = "array_double_features" + val featuresColNameF = "array_float_features" + val doubleUDF = udf { (features: Vector) => + val featureArray = Array.fill[Double](features.size)(0.0) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray + } + val floatUDF = udf { (features: Vector) => + val featureArray = Array.fill[Float](features.size)(0.0f) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray + } + val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) + .drop("features") + val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) + .drop("features") + assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) + assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) + + val bkmD = new BisectingKMeans() + .setK(k).setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1) + val bkmF = new BisectingKMeans() + .setK(k).setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1) + val modelD = bkmD.fit(newdatasetD) + val modelF = bkmF.fit(newdatasetF) + val transformedD = modelD.transform(newdatasetD) + val transformedF = modelF.transform(newdatasetF) + val predictDifference = transformedD.select("prediction") + .except(transformedF.select("prediction")) + assert(predictDifference.count() == 0) + assert(modelD.computeCost(newdatasetD) == modelF.computeCost(newdatasetF) ) + } } object BisectingKMeansSuite { diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala index 08b800b7e4183..0749775945720 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala @@ -25,6 +25,8 @@ import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext @@ -256,6 +258,42 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext val expectedMatrix = GaussianMixture.unpackUpperTriangularMatrix(4, triangularValues) assert(symmetricMatrix === expectedMatrix) } + + test("GaussianMixture with Array input") { + val featuresColNameD = "array_double_features" + val featuresColNameF = "array_float_features" + val doubleUDF = udf { (features: Vector) => + val featureArray = Array.fill[Double](features.size)(0.0) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray + } + val floatUDF = udf { (features: Vector) => + val featureArray = Array.fill[Float](features.size)(0.0f) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray + } + val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) + .drop("features") + val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) + .drop("features") + assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) + assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) + + val gmD = new GaussianMixture().setK(k).setMaxIter(1) + .setFeaturesCol(featuresColNameD).setSeed(1) + val gmF = new GaussianMixture().setK(k).setMaxIter(1) + .setFeaturesCol(featuresColNameF).setSeed(1) + val modelD = gmD.fit(newdatasetD) + val modelF = gmF.fit(newdatasetF) + val transformedD = modelD.transform(newdatasetD) + val transformedF = modelF.transform(newdatasetF) + val predictDifference = transformedD.select("prediction") + .except(transformedF.select("prediction")) + assert(predictDifference.count() == 0) + val probabilityDifference = transformedD.select("probability") + .except(transformedF.select("probability")) + assert(probabilityDifference.count() == 0) + } } object GaussianMixtureSuite extends SparkFunSuite { diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 5445ebe5c95eb..4d7d7dae6d07d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -30,8 +30,8 @@ import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans import org.apache.spark.mllib.linalg.{Vectors => MLlibVectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} private[clustering] case class TestRow(features: Vector) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala index e73bbc18d76bd..46173093e70ef 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala @@ -25,7 +25,8 @@ import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql._ - +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} object LDASuite { def generateLDAData( @@ -323,4 +324,44 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead assert(model.getOptimizer === optimizer) } } + + test("LDA with Array input") { + val featuresColNameD = "array_double_features" + val featuresColNameF = "array_float_features" + val doubleUDF = udf { (features: Vector) => + val featureArray = Array.fill[Double](features.size)(0.0) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray + } + val floatUDF = udf { (features: Vector) => + val featureArray = Array.fill[Float](features.size)(0.0f) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray + } + val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) + .drop("features") + val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) + .drop("features") + assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) + assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) + + val ldaD = new LDA().setK(k).setOptimizer("online") + .setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1) + val ldaF = new LDA().setK(k).setOptimizer("online"). + setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1) + val modelD = ldaD.fit(newdatasetD) + val modelF = ldaF.fit(newdatasetF) + + // logLikelihood, logPerplexity + val llD = modelD.logLikelihood(newdatasetD) + val llF = modelF.logLikelihood(newdatasetF) + // assert(llD == llF) + assert(llD <= 0.0 && llD != Double.NegativeInfinity) + assert(llF <= 0.0 && llF != Double.NegativeInfinity) + val lpD = modelD.logPerplexity(newdatasetD) + val lpF = modelF.logPerplexity(newdatasetF) + // assert(lpD == lpF) + assert(lpD >= 0.0 && lpD != Double.NegativeInfinity) + assert(lpF >= 0.0 && lpF != Double.NegativeInfinity) + } } From 877c126ff493e43edb5a8bcf33e7dd1fe59503b0 Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Thu, 3 May 2018 14:28:35 -0700 Subject: [PATCH 3/6] add general util functions in DatasetUtils and SchemaUtils --- .../spark/ml/clustering/BisectingKMeans.scala | 31 ++++--------------- .../spark/ml/clustering/GaussianMixture.scala | 16 ++-------- .../apache/spark/ml/clustering/KMeans.scala | 29 ++++------------- .../org/apache/spark/ml/clustering/LDA.scala | 14 +-------- .../apache/spark/ml/util/DatasetUtils.scala | 13 ++++++-- .../apache/spark/ml/util/SchemaUtils.scala | 16 +++++++++- 6 files changed, 41 insertions(+), 78 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index 514c58c0ea733..438e53ba6197c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -22,18 +22,16 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans, BisectingKMeansModel => MLlibBisectingKMeansModel} -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.VectorImplicits._ -import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.udf -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} +import org.apache.spark.sql.types.{IntegerType, StructType} /** @@ -69,24 +67,13 @@ private[clustering] trait BisectingKMeansParams extends Params with HasMaxIter @Since("2.0.0") def getMinDivisibleClusterSize: Double = $(minDivisibleClusterSize) - /** - * Validates the input schema. - * @param schema input schema - */ - private[clustering] def validateSchema(schema: StructType): Unit = { - val typeCandidates = List( new VectorUDT, - new ArrayType(DoubleType, false), - new ArrayType(FloatType, false)) - - SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) - } /** * Validates and transforms the input schema. * @param schema input schema * @return output schema */ protected def validateAndTransformSchema(schema: StructType): StructType = { - validateSchema(schema) + SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) } } @@ -144,11 +131,8 @@ class BisectingKMeansModel private[ml] ( */ @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { - validateSchema(dataset.schema) - val data: RDD[OldVector] = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol)) - .rdd.map { - case Row(point: Vector) => OldVectors.fromML(point) - } + SchemaUtils.validateVectorCompatibleColumn(dataset.schema, getFeaturesCol) + val data = DatasetUtils.columnToOldVector(dataset, getFeaturesCol) parentModel.computeCost(data) } @@ -275,10 +259,7 @@ class BisectingKMeans @Since("2.0.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): BisectingKMeansModel = { transformSchema(dataset.schema, logging = true) - val rdd: RDD[OldVector] = dataset - .select(DatasetUtils.columnToVector(dataset, getFeaturesCol)).rdd.map { - case Row(point: Vector) => OldVectors.fromML(point) - } + val rdd = DatasetUtils.columnToOldVector(dataset, getFeaturesCol) val instr = Instrumentation.create(this, rdd) instr.logParams(featuresCol, predictionCol, k, maxIter, seed, minDivisibleClusterSize) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index de67964ead937..88d618c3a03a8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -34,7 +34,7 @@ import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatr import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.udf -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} +import org.apache.spark.sql.types.{IntegerType, StructType} /** @@ -56,18 +56,6 @@ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter w @Since("2.0.0") def getK: Int = $(k) - /** - * Validates the input schema. - * @param schema input schema - */ - private[clustering] def validateSchema(schema: StructType): Unit = { - val typeCandidates = List( new VectorUDT, - new ArrayType(DoubleType, false), - new ArrayType(FloatType, false)) - - SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) - } - /** * Validates and transforms the input schema. * @@ -75,7 +63,7 @@ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter w * @return output schema */ protected def validateAndTransformSchema(schema: StructType): StructType = { - validateSchema(schema) + SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol) val schemaWithPredictionCol = SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) SchemaUtils.appendColumn(schemaWithPredictionCol, $(probabilityCol), new VectorUDT) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index e66b6315a2753..97f246fbfd859 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model, PipelineStage} -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -34,7 +34,7 @@ import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.udf -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} +import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.VersionUtils.majorVersion @@ -86,24 +86,13 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe @Since("1.5.0") def getInitSteps: Int = $(initSteps) - /** - * Validates the input schema. - * @param schema input schema - */ - private[clustering] def validateSchema(schema: StructType): Unit = { - val typeCandidates = List( new VectorUDT, - new ArrayType(DoubleType, false), - new ArrayType(FloatType, false)) - - SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) - } /** * Validates and transforms the input schema. * @param schema input schema * @return output schema */ protected def validateAndTransformSchema(schema: StructType): StructType = { - validateSchema(schema) + SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) } } @@ -160,11 +149,8 @@ class KMeansModel private[ml] ( // TODO: Replace the temp fix when we have proper evaluators defined for clustering. @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { - validateSchema(dataset.schema) - val data: RDD[OldVector] = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol)) - .rdd.map { - case Row(point: Vector) => OldVectors.fromML(point) - } + SchemaUtils.validateVectorCompatibleColumn(dataset.schema, getFeaturesCol) + val data = DatasetUtils.columnToOldVector(dataset, getFeaturesCol) parentModel.computeCost(data) } @@ -350,10 +336,7 @@ class KMeans @Since("1.5.0") ( transformSchema(dataset.schema, logging = true) val handlePersistence = dataset.storageLevel == StorageLevel.NONE - val instances: RDD[OldVector] = dataset - .select(DatasetUtils.columnToVector(dataset, getFeaturesCol)).rdd.map { - case Row(point: Vector) => OldVectors.fromML(point) - } + val instances = DatasetUtils.columnToOldVector(dataset, getFeaturesCol) if (handlePersistence) { instances.persist(StorageLevel.MEMORY_AND_DISK) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 07d65b8b6ad26..afe599cd167cb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -311,18 +311,6 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM @Since("2.0.0") def getKeepLastCheckpoint: Boolean = $(keepLastCheckpoint) - /** - * Validates the input schema. - * @param schema input schema - */ - private[clustering] def validateSchema(schema: StructType): Unit = { - val typeCandidates = List( new VectorUDT, - new ArrayType(DoubleType, false), - new ArrayType(FloatType, false)) - - SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) - } - /** * Validates and transforms the input schema. * @@ -357,7 +345,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM s" must be >= 1. Found value: $getTopicConcentration") } } - validateSchema(schema) + SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol) SchemaUtils.appendColumn(schema, $(topicDistributionCol), new VectorUDT) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala index 52619cb65489a..6af4b3ebc2cc2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala @@ -17,8 +17,10 @@ package org.apache.spark.ml.util -import org.apache.spark.ml.linalg.{Vectors, VectorUDT} -import org.apache.spark.sql.{Column, Dataset} +import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} +import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Dataset, Row} import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} @@ -60,4 +62,11 @@ private[spark] object DatasetUtils { throw new IllegalArgumentException(s"$other column cannot be cast to Vector") } } + + def columnToOldVector(dataset: Dataset[_], colName: String): RDD[OldVector] = { + dataset.select(columnToVector(dataset, colName)) + .rdd.map { + case Row(point: Vector) => OldVectors.fromML(point) + } + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala index 334410c9620de..2a6310ce8a637 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala @@ -17,7 +17,8 @@ package org.apache.spark.ml.util -import org.apache.spark.sql.types.{DataType, NumericType, StructField, StructType} +import org.apache.spark.ml.linalg.VectorUDT +import org.apache.spark.sql.types._ /** @@ -101,4 +102,17 @@ private[spark] object SchemaUtils { require(!schema.fieldNames.contains(col.name), s"Column ${col.name} already exists.") StructType(schema.fields :+ col) } + + /** + * Check whether the given column in the schema is one of the supporting vector type: Vector, + * Array[Dloat]. Array[Double] + * @param schema input schema + * @param colName column name + */ + def validateVectorCompatibleColumn(schema: StructType, colName: String): Unit = { + val typeCandidates = List( new VectorUDT, + new ArrayType(DoubleType, false), + new ArrayType(FloatType, false)) + checkColumnTypes(schema, colName, typeCandidates) + } } From c7a14bb14132ce8de084192b02fcdaa1922986a9 Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Fri, 4 May 2018 16:28:14 -0700 Subject: [PATCH 4/6] modify and clean the unit tests --- .../apache/spark/ml/util/SchemaUtils.scala | 2 +- .../ml/clustering/BisectingKMeansSuite.scala | 52 +++++++---------- .../ml/clustering/GaussianMixtureSuite.scala | 57 ++++++++----------- .../spark/ml/clustering/KMeansSuite.scala | 51 ++++++----------- .../apache/spark/ml/clustering/LDASuite.scala | 43 ++++---------- .../apache/spark/ml/util/MLTestingUtils.scala | 10 ++++ 6 files changed, 82 insertions(+), 133 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala index 2a6310ce8a637..d9a3f85ef9a24 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala @@ -105,7 +105,7 @@ private[spark] object SchemaUtils { /** * Check whether the given column in the schema is one of the supporting vector type: Vector, - * Array[Dloat]. Array[Double] + * Array[Float]. Array[Double] * @param schema input schema * @param colName column name */ diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index c102b40f3f226..ee5294f7c0086 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -17,15 +17,16 @@ package org.apache.spark.ml.clustering +import scala.language.existentials + import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.clustering.DistanceMeasure import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} +import org.apache.spark.sql.{DataFrame, Dataset} class BisectingKMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { @@ -186,37 +187,24 @@ class BisectingKMeansSuite } test("BisectingKMeans with Array input") { - val featuresColNameD = "array_double_features" - val featuresColNameF = "array_float_features" - val doubleUDF = udf { (features: Vector) => - val featureArray = Array.fill[Double](features.size)(0.0) - features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) - featureArray - } - val floatUDF = udf { (features: Vector) => - val featureArray = Array.fill[Float](features.size)(0.0f) - features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) - featureArray + def trainTransfromAndComputeCost(dataset: Dataset[_]): (DataFrame, Double) = { + val model = new BisectingKMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset) + (model.transform(dataset), model.computeCost(dataset)) } - val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) - .drop("features") - val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) - .drop("features") - assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) - assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) - - val bkmD = new BisectingKMeans() - .setK(k).setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1) - val bkmF = new BisectingKMeans() - .setK(k).setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1) - val modelD = bkmD.fit(newdatasetD) - val modelF = bkmF.fit(newdatasetF) - val transformedD = modelD.transform(newdatasetD) - val transformedF = modelF.transform(newdatasetF) - val predictDifference = transformedD.select("prediction") + + val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) + val (transformed, trueCost) = trainTransfromAndComputeCost(dataset) + val (transformedD, doubleArrayCost) = trainTransfromAndComputeCost(newDatasetD) + val (transformedF, floatArrayCost) = trainTransfromAndComputeCost(newDatasetF) + + val predictDifferenceD = transformed.select("prediction") + .except(transformedD.select("prediction")) + assert(predictDifferenceD.count() == 0) + val predictDifferenceF = transformed.select("prediction") .except(transformedF.select("prediction")) - assert(predictDifference.count() == 0) - assert(modelD.computeCost(newdatasetD) == modelF.computeCost(newdatasetF) ) + assert(predictDifferenceF.count() == 0) + assert(trueCost ~== doubleArrayCost absTol 1e-6) + assert(trueCost ~== floatArrayCost absTol 1e-6) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala index 0749775945720..9fd79723e7686 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.clustering +import scala.language.existentials + import org.apache.spark.SparkFunSuite import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors} import org.apache.spark.ml.param.ParamMap @@ -24,10 +26,7 @@ import org.apache.spark.ml.stat.distribution.MultivariateGaussian import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} - +import org.apache.spark.sql.{DataFrame, Dataset, Row} class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { @@ -260,39 +259,29 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext } test("GaussianMixture with Array input") { - val featuresColNameD = "array_double_features" - val featuresColNameF = "array_float_features" - val doubleUDF = udf { (features: Vector) => - val featureArray = Array.fill[Double](features.size)(0.0) - features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) - featureArray - } - val floatUDF = udf { (features: Vector) => - val featureArray = Array.fill[Float](features.size)(0.0f) - features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) - featureArray + def trainAndTransfrom(dataset: Dataset[_]): DataFrame = { + val model = new GaussianMixture().setK(k).setMaxIter(1).setSeed(1).fit(dataset) + model.transform(dataset) } - val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) - .drop("features") - val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) - .drop("features") - assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) - assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) - - val gmD = new GaussianMixture().setK(k).setMaxIter(1) - .setFeaturesCol(featuresColNameD).setSeed(1) - val gmF = new GaussianMixture().setK(k).setMaxIter(1) - .setFeaturesCol(featuresColNameF).setSeed(1) - val modelD = gmD.fit(newdatasetD) - val modelF = gmF.fit(newdatasetF) - val transformedD = modelD.transform(newdatasetD) - val transformedF = modelF.transform(newdatasetF) - val predictDifference = transformedD.select("prediction") + + val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) + val transformed = trainAndTransfrom(dataset) + val transformedD = trainAndTransfrom(newDatasetD) + val transformedF = trainAndTransfrom(newDatasetF) + + val predictDifferenceD = transformed.select("prediction") + .except(transformedD.select("prediction")) + assert(predictDifferenceD.count() == 0) + val predictDifferenceF = transformed.select("prediction") .except(transformedF.select("prediction")) - assert(predictDifference.count() == 0) - val probabilityDifference = transformedD.select("probability") + assert(predictDifferenceF.count() == 0) + + val probabilityDifferenceD = transformed.select("probability") + .except(transformedD.select("probability")) + assert(probabilityDifferenceD.count() == 0) + val probabilityDifferenceF = transformed.select("probability") .except(transformedF.select("probability")) - assert(probabilityDifference.count() == 0) + assert(probabilityDifferenceF.count() == 0) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 4d7d7dae6d07d..1058572027cd7 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.clustering +import scala.language.existentials import scala.util.Random import org.dmg.pmml.{ClusteringModel, PMML} @@ -25,13 +26,11 @@ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.util._ -import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, - KMeansModel => MLlibKMeansModel} +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.linalg.{Vectors => MLlibVectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} private[clustering] case class TestRow(features: Vector) @@ -202,38 +201,24 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR } test("KMean with Array input") { - val featuresColNameD = "array_double_features" - val featuresColNameF = "array_float_features" - - val doubleUDF = udf { (features: Vector) => - val featureArray = Array.fill[Double](features.size)(0.0) - features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) - featureArray - } - val floatUDF = udf { (features: Vector) => - val featureArray = Array.fill[Float](features.size)(0.0f) - features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) - featureArray + def trainTransfromAndComputeCost(dataset: Dataset[_]): (DataFrame, Double) = { + val model = new KMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset) + (model.transform(dataset), model.computeCost(dataset)) } - val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) - .drop("features") - val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) - .drop("features") - assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) - assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) - - val kmeansD = new KMeans().setK(k).setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1) - val kmeansF = new KMeans().setK(k).setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1) - val modelD = kmeansD.fit(newdatasetD) - val modelF = kmeansF.fit(newdatasetF) - val transformedD = modelD.transform(newdatasetD) - val transformedF = modelF.transform(newdatasetF) - - val predictDifference = transformedD.select("prediction") + val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) + val (transformed, trueCost) = trainTransfromAndComputeCost(dataset) + val (transformedD, doubleArrayCost) = trainTransfromAndComputeCost(newDatasetD) + val (transformedF, floatArrayCost) = trainTransfromAndComputeCost(newDatasetF) + + val predictDifferenceD = transformed.select("prediction") + .except(transformedD.select("prediction")) + assert(predictDifferenceD.count() == 0) + val predictDifferenceF = transformed.select("prediction") .except(transformedF.select("prediction")) - assert(predictDifference.count() == 0) - assert(modelD.computeCost(newdatasetD) == modelF.computeCost(newdatasetF) ) + assert(predictDifferenceF.count() == 0) + assert(trueCost ~== doubleArrayCost absTol 1e-6) + assert(trueCost ~== floatArrayCost absTol 1e-6) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala index 46173093e70ef..a7ffa1d793b8e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.clustering +import scala.language.existentials + import org.apache.hadoop.fs.Path import org.apache.spark.SparkFunSuite @@ -25,8 +27,6 @@ import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql._ -import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} object LDASuite { def generateLDAData( @@ -326,41 +326,18 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead } test("LDA with Array input") { - val featuresColNameD = "array_double_features" - val featuresColNameF = "array_float_features" - val doubleUDF = udf { (features: Vector) => - val featureArray = Array.fill[Double](features.size)(0.0) - features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) - featureArray - } - val floatUDF = udf { (features: Vector) => - val featureArray = Array.fill[Float](features.size)(0.0f) - features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) - featureArray + def trainAndLogLikehoodAndPerplexity(dataset: Dataset[_]): (Double, Double) = { + val model = new LDA().setK(k).setOptimizer("online").setMaxIter(1).setSeed(1).fit(dataset) + (model.logLikelihood(dataset), model.logPerplexity(dataset)) } - val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) - .drop("features") - val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) - .drop("features") - assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) - assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) - - val ldaD = new LDA().setK(k).setOptimizer("online") - .setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1) - val ldaF = new LDA().setK(k).setOptimizer("online"). - setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1) - val modelD = ldaD.fit(newdatasetD) - val modelF = ldaF.fit(newdatasetF) - // logLikelihood, logPerplexity - val llD = modelD.logLikelihood(newdatasetD) - val llF = modelF.logLikelihood(newdatasetF) - // assert(llD == llF) + val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) + val (ll, lp) = trainAndLogLikehoodAndPerplexity(dataset) + val (llD, lpD) = trainAndLogLikehoodAndPerplexity(newDatasetD) + val (llF, lpF) = trainAndLogLikehoodAndPerplexity(newDatasetF) + // TODO: need to compare the result once we fix the seed issue for LDA (SPARK-22210) assert(llD <= 0.0 && llD != Double.NegativeInfinity) assert(llF <= 0.0 && llF != Double.NegativeInfinity) - val lpD = modelD.logPerplexity(newdatasetD) - val lpF = modelF.logPerplexity(newdatasetF) - // assert(lpD == lpF) assert(lpD >= 0.0 && lpD != Double.NegativeInfinity) assert(lpF >= 0.0 && lpF != Double.NegativeInfinity) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala index c328d81b4bc3a..0fcff0b757723 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala @@ -247,4 +247,14 @@ object MLTestingUtils extends SparkFunSuite { } models.sliding(2).foreach { case Seq(m1, m2) => modelEquals(m1, m2)} } + + def generateArrayFeatureDataset(dataset: Dataset[_]): (Dataset[_], Dataset[_]) = { + val doubleUDF = udf { (features: Vector) => features.toArray.map(_.toFloat.toDouble)} + val floatUDF = udf { (features: Vector) => features.toArray.map(_.toFloat)} + val newDatasetD = dataset.withColumn("features", doubleUDF(col("features"))) + val newDatasetF = dataset.withColumn("features", floatUDF(col("features"))) + assert(newDatasetD.schema("features").dataType.equals(new ArrayType(DoubleType, false))) + assert(newDatasetF.schema("features").dataType.equals(new ArrayType(FloatType, false))) + (newDatasetD, newDatasetF) + } } From d065634ae70e5f0582eebcba84c90cc06e27e890 Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Mon, 7 May 2018 11:09:10 -0700 Subject: [PATCH 5/6] fix typos and make the unit tests simpler --- .../ml/clustering/BisectingKMeansSuite.scala | 21 +++++-------- .../ml/clustering/GaussianMixtureSuite.scala | 30 +++++++------------ .../spark/ml/clustering/KMeansSuite.scala | 21 +++++-------- .../apache/spark/ml/clustering/LDASuite.scala | 10 +++---- .../apache/spark/ml/util/MLTestingUtils.scala | 19 ++++++++---- 5 files changed, 44 insertions(+), 57 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index ee5294f7c0086..f3ff2afcad2cd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -187,22 +187,17 @@ class BisectingKMeansSuite } test("BisectingKMeans with Array input") { - def trainTransfromAndComputeCost(dataset: Dataset[_]): (DataFrame, Double) = { + def trainAndComputeCost(dataset: Dataset[_]): Double = { val model = new BisectingKMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset) - (model.transform(dataset), model.computeCost(dataset)) + model.computeCost(dataset) } - val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) - val (transformed, trueCost) = trainTransfromAndComputeCost(dataset) - val (transformedD, doubleArrayCost) = trainTransfromAndComputeCost(newDatasetD) - val (transformedF, floatArrayCost) = trainTransfromAndComputeCost(newDatasetF) - - val predictDifferenceD = transformed.select("prediction") - .except(transformedD.select("prediction")) - assert(predictDifferenceD.count() == 0) - val predictDifferenceF = transformed.select("prediction") - .except(transformedF.select("prediction")) - assert(predictDifferenceF.count() == 0) + val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) + val trueCost = trainAndComputeCost(newDataset) + val doubleArrayCost = trainAndComputeCost(newDatasetD) + val floatArrayCost = trainAndComputeCost(newDatasetF) + + // checking the cost is fine enough as a sanity check assert(trueCost ~== doubleArrayCost absTol 1e-6) assert(trueCost ~== floatArrayCost absTol 1e-6) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala index 9fd79723e7686..cd99a3a5776c8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala @@ -259,29 +259,19 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext } test("GaussianMixture with Array input") { - def trainAndTransfrom(dataset: Dataset[_]): DataFrame = { + def trainAndComputlogLikelihood(dataset: Dataset[_]): Double = { val model = new GaussianMixture().setK(k).setMaxIter(1).setSeed(1).fit(dataset) - model.transform(dataset) + model.summary.logLikelihood } - val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) - val transformed = trainAndTransfrom(dataset) - val transformedD = trainAndTransfrom(newDatasetD) - val transformedF = trainAndTransfrom(newDatasetF) - - val predictDifferenceD = transformed.select("prediction") - .except(transformedD.select("prediction")) - assert(predictDifferenceD.count() == 0) - val predictDifferenceF = transformed.select("prediction") - .except(transformedF.select("prediction")) - assert(predictDifferenceF.count() == 0) - - val probabilityDifferenceD = transformed.select("probability") - .except(transformedD.select("probability")) - assert(probabilityDifferenceD.count() == 0) - val probabilityDifferenceF = transformed.select("probability") - .except(transformedF.select("probability")) - assert(probabilityDifferenceF.count() == 0) + val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) + val trueLikelihood = trainAndComputlogLikelihood(newDataset) + val doubleLikelihood = trainAndComputlogLikelihood(newDatasetD) + val floatLikelihood = trainAndComputlogLikelihood(newDatasetF) + + // checking the cost is fine enough as a sanity check + assert(trueLikelihood == doubleLikelihood) + assert(trueLikelihood == floatLikelihood) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 1058572027cd7..680a7c2034083 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -201,22 +201,17 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR } test("KMean with Array input") { - def trainTransfromAndComputeCost(dataset: Dataset[_]): (DataFrame, Double) = { + def trainAndComputeCost(dataset: Dataset[_]): Double = { val model = new KMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset) - (model.transform(dataset), model.computeCost(dataset)) + model.computeCost(dataset) } - val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) - val (transformed, trueCost) = trainTransfromAndComputeCost(dataset) - val (transformedD, doubleArrayCost) = trainTransfromAndComputeCost(newDatasetD) - val (transformedF, floatArrayCost) = trainTransfromAndComputeCost(newDatasetF) - - val predictDifferenceD = transformed.select("prediction") - .except(transformedD.select("prediction")) - assert(predictDifferenceD.count() == 0) - val predictDifferenceF = transformed.select("prediction") - .except(transformedF.select("prediction")) - assert(predictDifferenceF.count() == 0) + val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) + val trueCost = trainAndComputeCost(newDataset) + val doubleArrayCost = trainAndComputeCost(newDatasetD) + val floatArrayCost = trainAndComputeCost(newDatasetF) + + // checking the cost is fine enough as a sanity check assert(trueCost ~== doubleArrayCost absTol 1e-6) assert(trueCost ~== floatArrayCost absTol 1e-6) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala index a7ffa1d793b8e..e2b18de43f837 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala @@ -326,15 +326,15 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead } test("LDA with Array input") { - def trainAndLogLikehoodAndPerplexity(dataset: Dataset[_]): (Double, Double) = { + def trainAndLogLikelihoodAndPerplexity(dataset: Dataset[_]): (Double, Double) = { val model = new LDA().setK(k).setOptimizer("online").setMaxIter(1).setSeed(1).fit(dataset) (model.logLikelihood(dataset), model.logPerplexity(dataset)) } - val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) - val (ll, lp) = trainAndLogLikehoodAndPerplexity(dataset) - val (llD, lpD) = trainAndLogLikehoodAndPerplexity(newDatasetD) - val (llF, lpF) = trainAndLogLikehoodAndPerplexity(newDatasetF) + val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) + val (ll, lp) = trainAndLogLikelihoodAndPerplexity(newDataset) + val (llD, lpD) = trainAndLogLikelihoodAndPerplexity(newDatasetD) + val (llF, lpF) = trainAndLogLikelihoodAndPerplexity(newDatasetF) // TODO: need to compare the result once we fix the seed issue for LDA (SPARK-22210) assert(llD <= 0.0 && llD != Double.NegativeInfinity) assert(llF <= 0.0 && llF != Double.NegativeInfinity) diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala index 0fcff0b757723..7cdfeb902dc5a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala @@ -248,13 +248,20 @@ object MLTestingUtils extends SparkFunSuite { models.sliding(2).foreach { case Seq(m1, m2) => modelEquals(m1, m2)} } - def generateArrayFeatureDataset(dataset: Dataset[_]): (Dataset[_], Dataset[_]) = { - val doubleUDF = udf { (features: Vector) => features.toArray.map(_.toFloat.toDouble)} - val floatUDF = udf { (features: Vector) => features.toArray.map(_.toFloat)} - val newDatasetD = dataset.withColumn("features", doubleUDF(col("features"))) - val newDatasetF = dataset.withColumn("features", floatUDF(col("features"))) + /** + * Helper function for testing different input types for features. Given a DataFrame, generate + * three output DataFrames: one having vector feature column with float precision, one having + * double array feature column with float precision, and one having float array feature column. + */ + def generateArrayFeatureDataset(dataset: Dataset[_]): (Dataset[_], Dataset[_], Dataset[_]) = { + val toFloatVectorUDF = udf { (features: Vector) => features.toArray.map(_.toFloat).toVector} + val toDoubleArrayUDF = udf { (features: Vector) => features.toArray} + val toFloatArrayUDF = udf { (features: Vector) => features.toArray.map(_.toFloat)} + val newDataset = dataset.withColumn("features", toFloatVectorUDF(col("features"))) + val newDatasetD = dataset.withColumn("features", toDoubleArrayUDF(col("features"))) + val newDatasetF = dataset.withColumn("features", toFloatArrayUDF(col("features"))) assert(newDatasetD.schema("features").dataType.equals(new ArrayType(DoubleType, false))) assert(newDatasetF.schema("features").dataType.equals(new ArrayType(FloatType, false))) - (newDatasetD, newDatasetF) + (newDataset, newDatasetD, newDatasetF) } } From c9f478e990da34819828efc26c11149909e95ccc Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Mon, 7 May 2018 15:11:07 -0700 Subject: [PATCH 6/6] minor bug fix --- .../ml/clustering/GaussianMixtureSuite.scala | 4 +-- .../apache/spark/ml/clustering/LDASuite.scala | 2 +- .../apache/spark/ml/util/MLTestingUtils.scala | 26 +++++++++++-------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala index cd99a3a5776c8..d0d461a42711a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala @@ -270,8 +270,8 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext val floatLikelihood = trainAndComputlogLikelihood(newDatasetF) // checking the cost is fine enough as a sanity check - assert(trueLikelihood == doubleLikelihood) - assert(trueLikelihood == floatLikelihood) + assert(trueLikelihood ~== doubleLikelihood absTol 1e-6) + assert(trueLikelihood ~== floatLikelihood absTol 1e-6) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala index e2b18de43f837..8d728f063dd8c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala @@ -335,7 +335,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead val (ll, lp) = trainAndLogLikelihoodAndPerplexity(newDataset) val (llD, lpD) = trainAndLogLikelihoodAndPerplexity(newDatasetD) val (llF, lpF) = trainAndLogLikelihoodAndPerplexity(newDatasetF) - // TODO: need to compare the result once we fix the seed issue for LDA (SPARK-22210) + // TODO: need to compare the results once we fix the seed issue for LDA (SPARK-22210) assert(llD <= 0.0 && llD != Double.NegativeInfinity) assert(llF <= 0.0 && llF != Double.NegativeInfinity) assert(lpD >= 0.0 && lpD != Double.NegativeInfinity) diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala index 7cdfeb902dc5a..5e72b4d864c1d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.ml._ import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.ml.feature.{Instance, LabeledPoint} -import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, HasWeightCol} import org.apache.spark.ml.recommendation.{ALS, ALSModel} @@ -249,19 +249,23 @@ object MLTestingUtils extends SparkFunSuite { } /** - * Helper function for testing different input types for features. Given a DataFrame, generate - * three output DataFrames: one having vector feature column with float precision, one having - * double array feature column with float precision, and one having float array feature column. + * Helper function for testing different input types for "features" column. Given a DataFrame, + * generate three output DataFrames: one having vector "features" column with float precision, + * one having double array "features" column with float precision, and one having float array + * "features" column. */ - def generateArrayFeatureDataset(dataset: Dataset[_]): (Dataset[_], Dataset[_], Dataset[_]) = { - val toFloatVectorUDF = udf { (features: Vector) => features.toArray.map(_.toFloat).toVector} + def generateArrayFeatureDataset(dataset: Dataset[_], + featuresColName: String = "features"): (Dataset[_], Dataset[_], Dataset[_]) = { + val toFloatVectorUDF = udf { (features: Vector) => + Vectors.dense(features.toArray.map(_.toFloat.toDouble))} val toDoubleArrayUDF = udf { (features: Vector) => features.toArray} val toFloatArrayUDF = udf { (features: Vector) => features.toArray.map(_.toFloat)} - val newDataset = dataset.withColumn("features", toFloatVectorUDF(col("features"))) - val newDatasetD = dataset.withColumn("features", toDoubleArrayUDF(col("features"))) - val newDatasetF = dataset.withColumn("features", toFloatArrayUDF(col("features"))) - assert(newDatasetD.schema("features").dataType.equals(new ArrayType(DoubleType, false))) - assert(newDatasetF.schema("features").dataType.equals(new ArrayType(FloatType, false))) + val newDataset = dataset.withColumn(featuresColName, toFloatVectorUDF(col(featuresColName))) + val newDatasetD = newDataset.withColumn(featuresColName, toDoubleArrayUDF(col(featuresColName))) + val newDatasetF = newDataset.withColumn(featuresColName, toFloatArrayUDF(col(featuresColName))) + assert(newDataset.schema(featuresColName).dataType.equals(new VectorUDT)) + assert(newDatasetD.schema(featuresColName).dataType.equals(new ArrayType(DoubleType, false))) + assert(newDatasetF.schema(featuresColName).dataType.equals(new ArrayType(FloatType, false))) (newDataset, newDatasetD, newDatasetF) } }