From 92c32dd012b18becf3ec02a04f664ab905ee0edf Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 26 Aug 2018 15:34:57 +0200 Subject: [PATCH 1/9] [SPARK-10697][ML] Add lift to Association rules --- .../org/apache/spark/ml/fpm/FPGrowth.scala | 48 +++++++++++++++---- .../spark/mllib/fpm/AssociationRules.scala | 29 +++++++++-- .../org/apache/spark/mllib/fpm/FPGrowth.scala | 20 ++++---- .../apache/spark/ml/fpm/FPGrowthSuite.scala | 6 +-- 4 files changed, 78 insertions(+), 25 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 85c483c387ad8..599fb1d728305 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.fpm import scala.reflect.ClassTag -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} @@ -187,7 +187,7 @@ class FPGrowth @Since("2.2.0") ( items.unpersist() } - copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this) + copyValues(new FPGrowthModel(uid, frequentItems, parentModel.itemSupport)).setParent(this) } @Since("2.2.0") @@ -217,9 +217,13 @@ object FPGrowth extends DefaultParamsReadable[FPGrowth] { @Experimental class FPGrowthModel private[ml] ( @Since("2.2.0") override val uid: String, - @Since("2.2.0") @transient val freqItemsets: DataFrame) + @Since("2.2.0") @transient val freqItemsets: DataFrame, + private val itemSupport: Map[Any, Long]) extends Model[FPGrowthModel] with FPGrowthParams with MLWritable { + private[ml] def this(uid: String, freqItemsets: DataFrame) = + this(uid, freqItemsets, Map.empty) + /** @group setParam */ @Since("2.2.0") def setMinConfidence(value: Double): this.type = set(minConfidence, value) @@ -251,7 +255,7 @@ class FPGrowthModel private[ml] ( _cachedRules } else { _cachedRules = AssociationRules - .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence)) + .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport) _cachedMinConf = $(minConfidence) _cachedRules } @@ -301,7 +305,7 @@ class FPGrowthModel private[ml] ( @Since("2.2.0") override def copy(extra: ParamMap): FPGrowthModel = { - val copied = new FPGrowthModel(uid, freqItemsets) + val copied = new FPGrowthModel(uid, freqItemsets, itemSupport) copyValues(copied, extra).setParent(this.parent) } @@ -326,6 +330,19 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val dataPath = new Path(path, "data").toString instance.freqItemsets.write.parquet(dataPath) + val itemDataType = instance.freqItemsets.schema(instance.getItemsCol).dataType match { + case ArrayType(et, _) => et + case other => other // we should never get here + } + val itemSupportPath = new Path(path, "itemSupport").toString + val itemSupportRows = instance.itemSupport.map { + case (item, support) => Row(item, support) + }.toSeq + val schema = StructType(Seq( + StructField("item", itemDataType, nullable = false), + StructField("support", LongType, nullable = false))) + sparkSession.createDataFrame(sc.parallelize(itemSupportRows), schema) + .repartition(1).write.parquet(itemSupportPath) } } @@ -338,7 +355,16 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString val frequentItems = sparkSession.read.parquet(dataPath) - val model = new FPGrowthModel(metadata.uid, frequentItems) + val itemSupportPath = new Path(path, "itemSupport") + val fs = FileSystem.get(sc.hadoopConfiguration) + val itemSupport = if (fs.exists(itemSupportPath)) { + sparkSession.read.parquet(itemSupportPath.toString).rdd.collect().map { + case Row(item: Any, support: Long) => item -> support + }.toMap + } else { + Map.empty[Any, Long] + } + val model = new FPGrowthModel(metadata.uid, frequentItems, itemSupport) metadata.getAndSetParams(model) model } @@ -361,20 +387,22 @@ private[fpm] object AssociationRules { dataset: Dataset[_], itemsCol: String, freqCol: String, - minConfidence: Double): DataFrame = { + minConfidence: Double, + itemSupport: Map[Any, Long]): DataFrame = { val freqItemSetRdd = dataset.select(itemsCol, freqCol).rdd .map(row => new FreqItemset(row.getSeq[T](0).toArray, row.getLong(1))) val rows = new MLlibAssociationRules() .setMinConfidence(minConfidence) - .run(freqItemSetRdd) - .map(r => Row(r.antecedent, r.consequent, r.confidence)) + .run(freqItemSetRdd, itemSupport.asInstanceOf[Map[T, Long]]) + .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull)) val dt = dataset.schema(itemsCol).dataType val schema = StructType(Seq( StructField("antecedent", dt, nullable = false), StructField("consequent", dt, nullable = false), - StructField("confidence", DoubleType, nullable = false))) + StructField("confidence", DoubleType, nullable = false), + StructField("lift", DoubleType))) val rules = dataset.sparkSession.createDataFrame(rows, schema) rules } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala index acb83ac31affd..3de2fd4f9d957 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala @@ -61,6 +61,18 @@ class AssociationRules private[fpm] ( */ @Since("1.5.0") def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]] = { + run(freqItemsets, Map.empty[Item, Long]) + } + + /** + * Computes the association rules with confidence above `minConfidence`. + * @param freqItemsets frequent itemset model obtained from [[FPGrowth]] + * @return a `Set[Rule[Item]]` containing the association rules. The rules will be able to + * compute also the lift metric. + */ + @Since("2.4.0") + def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]], + itemSupport: Map[Item, Long]): RDD[Rule[Item]] = { // For candidate rule X => Y, generate (X, (Y, freq(X union Y))) val candidates = freqItemsets.flatMap { itemset => val items = itemset.items @@ -76,8 +88,13 @@ class AssociationRules private[fpm] ( // Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq))) .map { case (antecendent, ((consequent, freqUnion), freqAntecedent)) => - new Rule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent) - }.filter(_.confidence >= minConfidence) + new Rule(antecendent.toArray, + consequent.toArray, + freqUnion, + freqAntecedent, + // the consequent contains always only one element + itemSupport.get(consequent.head)) + }.filter(_.confidence >= minConfidence) } /** @@ -107,7 +124,8 @@ object AssociationRules { @Since("1.5.0") val antecedent: Array[Item], @Since("1.5.0") val consequent: Array[Item], freqUnion: Double, - freqAntecedent: Double) extends Serializable { + freqAntecedent: Double, + freqConsequent: Option[Long]) extends Serializable { /** * Returns the confidence of the rule. @@ -116,6 +134,9 @@ object AssociationRules { @Since("1.5.0") def confidence: Double = freqUnion.toDouble / freqAntecedent + @Since("2.4.0") + def lift: Option[Double] = freqConsequent.map(fCons => confidence / fCons) + require(antecedent.toSet.intersect(consequent.toSet).isEmpty, { val sharedItems = antecedent.toSet.intersect(consequent.toSet) s"A valid association rule must have disjoint antecedent and " + @@ -142,7 +163,7 @@ object AssociationRules { override def toString: String = { s"${antecedent.mkString("{", ",", "}")} => " + - s"${consequent.mkString("{", ",", "}")}: ${confidence}" + s"${consequent.mkString("{", ",", "}")}: (confidence: $confidence; lift: $lift)" } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index 4f2b7e6f0764e..7ecef34014985 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -48,9 +48,14 @@ import org.apache.spark.storage.StorageLevel * @tparam Item item type */ @Since("1.3.0") -class FPGrowthModel[Item: ClassTag] @Since("1.3.0") ( - @Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]]) +class FPGrowthModel[Item: ClassTag] @Since("2.4.0") ( + @Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]], + @Since("2.4.0") val itemSupport: Map[Item, Long]) extends Saveable with Serializable { + + @Since("1.3.0") + def this(freqItemsets: RDD[FreqItemset[Item]]) = this(freqItemsets, Map.empty) + /** * Generates association rules for the `Item`s in [[freqItemsets]]. * @param confidence minimal confidence of the rules produced @@ -58,7 +63,7 @@ class FPGrowthModel[Item: ClassTag] @Since("1.3.0") ( @Since("1.5.0") def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = { val associationRules = new AssociationRules(confidence) - associationRules.run(freqItemsets) + associationRules.run(freqItemsets, itemSupport) } /** @@ -213,9 +218,9 @@ class FPGrowth private[spark] ( val minCount = math.ceil(minSupport * count).toLong val numParts = if (numPartitions > 0) numPartitions else data.partitions.length val partitioner = new HashPartitioner(numParts) - val freqItems = genFreqItems(data, minCount, partitioner) - val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner) - new FPGrowthModel(freqItemsets) + val freqItemsCount = genFreqItems(data, minCount, partitioner) + val freqItemsets = genFreqItemsets(data, minCount, freqItemsCount.map(_._1), partitioner) + new FPGrowthModel(freqItemsets, freqItemsCount.toMap) } /** @@ -236,7 +241,7 @@ class FPGrowth private[spark] ( private def genFreqItems[Item: ClassTag]( data: RDD[Array[Item]], minCount: Long, - partitioner: Partitioner): Array[Item] = { + partitioner: Partitioner): Array[(Item, Long)] = { data.flatMap { t => val uniq = t.toSet if (t.length != uniq.size) { @@ -248,7 +253,6 @@ class FPGrowth private[spark] ( .filter(_._2 >= minCount) .collect() .sortBy(-_._2) - .map(_._1) } /** diff --git a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala index 87f8b9034dde8..9600d387b033d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala @@ -39,9 +39,9 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul val model = new FPGrowth().setMinSupport(0.5).fit(data) val generatedRules = model.setMinConfidence(0.5).associationRules val expectedRules = spark.createDataFrame(Seq( - (Array("2"), Array("1"), 1.0), - (Array("1"), Array("2"), 0.75) - )).toDF("antecedent", "consequent", "confidence") + (Array("2"), Array("1"), 1.0, 0.25), + (Array("1"), Array("2"), 0.75, 0.25) + )).toDF("antecedent", "consequent", "confidence", "lift") .withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) .withColumn("consequent", col("consequent").cast(ArrayType(dt))) assert(expectedRules.sort("antecedent").rdd.collect().sameElements( From 7f052b875e6b13cd9af789f78a188e89ffeedf42 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 26 Aug 2018 16:48:23 +0200 Subject: [PATCH 2/9] fix mima --- project/MimaExcludes.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index cdc99a48e5b64..a99809ae6cc59 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,8 @@ object MimaExcludes { // Exclude rules for 2.4.x lazy val v24excludes = v23excludes ++ Seq( + // [SPARK-10697][ML] Add lift to Association rules + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.fpm.AssociationRules#Rule.this"), // [SPARK-24296][CORE] Replicate large blocks as a stream. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.this"), // [SPARK-23528] Add numIter to ClusteringSummary From 4c8b7beb7fe4f28d9f33306410d6237f19cadf72 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 27 Aug 2018 14:35:17 +0200 Subject: [PATCH 3/9] fix + address comments --- .../org/apache/spark/ml/fpm/FPGrowth.scala | 23 +++++++++---------- .../spark/mllib/fpm/AssociationRules.scala | 11 +++++---- .../org/apache/spark/mllib/fpm/FPGrowth.scala | 9 +++++--- .../apache/spark/ml/fpm/FPGrowthSuite.scala | 4 ++-- project/MimaExcludes.scala | 1 + python/pyspark/ml/tests.py | 4 ++-- 6 files changed, 28 insertions(+), 24 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 599fb1d728305..71752a2ff64d6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -218,12 +218,9 @@ object FPGrowth extends DefaultParamsReadable[FPGrowth] { class FPGrowthModel private[ml] ( @Since("2.2.0") override val uid: String, @Since("2.2.0") @transient val freqItemsets: DataFrame, - private val itemSupport: Map[Any, Long]) + private val itemSupport: scala.collection.Map[Any, Double]) extends Model[FPGrowthModel] with FPGrowthParams with MLWritable { - private[ml] def this(uid: String, freqItemsets: DataFrame) = - this(uid, freqItemsets, Map.empty) - /** @group setParam */ @Since("2.2.0") def setMinConfidence(value: Double): this.type = set(minConfidence, value) @@ -332,7 +329,8 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { instance.freqItemsets.write.parquet(dataPath) val itemDataType = instance.freqItemsets.schema(instance.getItemsCol).dataType match { case ArrayType(et, _) => et - case other => other // we should never get here + case other => throw new RuntimeException(s"Expected ${ArrayType.simpleString}, but got " + + other.catalogString + ".") } val itemSupportPath = new Path(path, "itemSupport").toString val itemSupportRows = instance.itemSupport.map { @@ -340,7 +338,7 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { }.toSeq val schema = StructType(Seq( StructField("item", itemDataType, nullable = false), - StructField("support", LongType, nullable = false))) + StructField("support", DoubleType, nullable = false))) sparkSession.createDataFrame(sc.parallelize(itemSupportRows), schema) .repartition(1).write.parquet(itemSupportPath) } @@ -358,11 +356,11 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { val itemSupportPath = new Path(path, "itemSupport") val fs = FileSystem.get(sc.hadoopConfiguration) val itemSupport = if (fs.exists(itemSupportPath)) { - sparkSession.read.parquet(itemSupportPath.toString).rdd.collect().map { - case Row(item: Any, support: Long) => item -> support - }.toMap + sparkSession.read.parquet(itemSupportPath.toString).rdd.map { + case Row(item: Any, support: Double) => item -> support + }.collectAsMap() } else { - Map.empty[Any, Long] + Map.empty[Any, Double] } val model = new FPGrowthModel(metadata.uid, frequentItems, itemSupport) metadata.getAndSetParams(model) @@ -380,6 +378,7 @@ private[fpm] object AssociationRules { * @param itemsCol column name for frequent itemsets * @param freqCol column name for appearance count of the frequent itemsets * @param minConfidence minimum confidence for generating the association rules + * @param itemSupport map containing an item and its support * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double]) * containing the association rules. */ @@ -388,13 +387,13 @@ private[fpm] object AssociationRules { itemsCol: String, freqCol: String, minConfidence: Double, - itemSupport: Map[Any, Long]): DataFrame = { + itemSupport: scala.collection.Map[T, Double]): DataFrame = { val freqItemSetRdd = dataset.select(itemsCol, freqCol).rdd .map(row => new FreqItemset(row.getSeq[T](0).toArray, row.getLong(1))) val rows = new MLlibAssociationRules() .setMinConfidence(minConfidence) - .run(freqItemSetRdd, itemSupport.asInstanceOf[Map[T, Long]]) + .run(freqItemSetRdd, itemSupport) .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull)) val dt = dataset.schema(itemsCol).dataType diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala index 3de2fd4f9d957..876774888608a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala @@ -56,23 +56,24 @@ class AssociationRules private[fpm] ( /** * Computes the association rules with confidence above `minConfidence`. * @param freqItemsets frequent itemset model obtained from [[FPGrowth]] - * @return a `Set[Rule[Item]]` containing the association rules. + * @return a `RDD[Rule[Item]]` containing the association rules. * */ @Since("1.5.0") def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]] = { - run(freqItemsets, Map.empty[Item, Long]) + run(freqItemsets, Map.empty[Item, Double]) } /** * Computes the association rules with confidence above `minConfidence`. * @param freqItemsets frequent itemset model obtained from [[FPGrowth]] - * @return a `Set[Rule[Item]]` containing the association rules. The rules will be able to + * @param itemSupport map containing an item and its support + * @return a `RDD[Rule[Item]]` containing the association rules. The rules will be able to * compute also the lift metric. */ @Since("2.4.0") def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]], - itemSupport: Map[Item, Long]): RDD[Rule[Item]] = { + itemSupport: scala.collection.Map[Item, Double]): RDD[Rule[Item]] = { // For candidate rule X => Y, generate (X, (Y, freq(X union Y))) val candidates = freqItemsets.flatMap { itemset => val items = itemset.items @@ -125,7 +126,7 @@ object AssociationRules { @Since("1.5.0") val consequent: Array[Item], freqUnion: Double, freqAntecedent: Double, - freqConsequent: Option[Long]) extends Serializable { + freqConsequent: Option[Double]) extends Serializable { /** * Returns the confidence of the rule. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index 7ecef34014985..3a1bc35186dc3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -50,7 +50,7 @@ import org.apache.spark.storage.StorageLevel @Since("1.3.0") class FPGrowthModel[Item: ClassTag] @Since("2.4.0") ( @Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]], - @Since("2.4.0") val itemSupport: Map[Item, Long]) + @Since("2.4.0") val itemSupport: Map[Item, Double]) extends Saveable with Serializable { @Since("1.3.0") @@ -220,7 +220,10 @@ class FPGrowth private[spark] ( val partitioner = new HashPartitioner(numParts) val freqItemsCount = genFreqItems(data, minCount, partitioner) val freqItemsets = genFreqItemsets(data, minCount, freqItemsCount.map(_._1), partitioner) - new FPGrowthModel(freqItemsets, freqItemsCount.toMap) + val itemSupport = freqItemsCount.map { + case (item, cnt) => item -> cnt.toDouble / count + }.toMap + new FPGrowthModel(freqItemsets, itemSupport) } /** @@ -236,7 +239,7 @@ class FPGrowth private[spark] ( * Generates frequent items by filtering the input data using minimal support level. * @param minCount minimum count for frequent itemsets * @param partitioner partitioner used to distribute items - * @return array of frequent pattern ordered by their frequencies + * @return array of frequent patterns and their frequencies ordered by their frequencies */ private def genFreqItems[Item: ClassTag]( data: RDD[Array[Item]], diff --git a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala index 9600d387b033d..b75526a48371a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala @@ -39,8 +39,8 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul val model = new FPGrowth().setMinSupport(0.5).fit(data) val generatedRules = model.setMinConfidence(0.5).associationRules val expectedRules = spark.createDataFrame(Seq( - (Array("2"), Array("1"), 1.0, 0.25), - (Array("1"), Array("2"), 0.75, 0.25) + (Array("2"), Array("1"), 1.0, 1.0), + (Array("1"), Array("2"), 0.75, 1.0) )).toDF("antecedent", "consequent", "confidence", "lift") .withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) .withColumn("consequent", col("consequent").cast(ArrayType(dt))) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a99809ae6cc59..f9443003e3788 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -37,6 +37,7 @@ object MimaExcludes { // Exclude rules for 2.4.x lazy val v24excludes = v23excludes ++ Seq( // [SPARK-10697][ML] Add lift to Association rules + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.fpm.FPGrowthModel.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.fpm.AssociationRules#Rule.this"), // [SPARK-24296][CORE] Replicate large blocks as a stream. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.this"), diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 5c87d1de4139b..625d9927f7063 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -2158,8 +2158,8 @@ def test_association_rules(self): fpm = fp.fit(self.data) expected_association_rules = self.spark.createDataFrame( - [([3], [1], 1.0), ([2], [1], 1.0)], - ["antecedent", "consequent", "confidence"] + [([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)], + ["antecedent", "consequent", "confidence", "lift"] ) actual_association_rules = fpm.associationRules From 5970876120eb2207994e38b9709faa4d6edbae5d Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 27 Aug 2018 16:37:00 +0200 Subject: [PATCH 4/9] address comments --- mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 71752a2ff64d6..71e1ba314f78a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -329,8 +329,8 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { instance.freqItemsets.write.parquet(dataPath) val itemDataType = instance.freqItemsets.schema(instance.getItemsCol).dataType match { case ArrayType(et, _) => et - case other => throw new RuntimeException(s"Expected ${ArrayType.simpleString}, but got " + - other.catalogString + ".") + case other => throw new IllegalArgumentException( + s"Expected ${ArrayType.simpleString}, but got ${other.catalogString}.") } val itemSupportPath = new Path(path, "itemSupport").toString val itemSupportRows = instance.itemSupport.map { From 957a6a2cf0e05f01c2c2d602944b8da8cfb1b426 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 27 Aug 2018 18:35:03 +0200 Subject: [PATCH 5/9] compute itemSupport instead of saving them --- .../org/apache/spark/ml/fpm/FPGrowth.scala | 48 ++++++++----------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 71e1ba314f78a..ea53b839ba222 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -19,7 +19,9 @@ package org.apache.spark.ml.fpm import scala.reflect.ClassTag -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path +import org.json4s.{DefaultFormats, JObject} +import org.json4s.JsonDSL._ import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} @@ -175,7 +177,8 @@ class FPGrowth @Since("2.2.0") ( if (handlePersistence) { items.persist(StorageLevel.MEMORY_AND_DISK) } - + val inputRowCount = items.count() + instr.logNumExamples(inputRowCount) val parentModel = mllibFP.run(items) val rows = parentModel.freqItemsets.map(f => Row(f.items, f.freq)) val schema = StructType(Seq( @@ -187,7 +190,8 @@ class FPGrowth @Since("2.2.0") ( items.unpersist() } - copyValues(new FPGrowthModel(uid, frequentItems, parentModel.itemSupport)).setParent(this) + copyValues(new FPGrowthModel(uid, frequentItems, parentModel.itemSupport, inputRowCount)) + .setParent(this) } @Since("2.2.0") @@ -218,7 +222,8 @@ object FPGrowth extends DefaultParamsReadable[FPGrowth] { class FPGrowthModel private[ml] ( @Since("2.2.0") override val uid: String, @Since("2.2.0") @transient val freqItemsets: DataFrame, - private val itemSupport: scala.collection.Map[Any, Double]) + private val itemSupport: scala.collection.Map[Any, Double], + private val inputSize: Long) extends Model[FPGrowthModel] with FPGrowthParams with MLWritable { /** @group setParam */ @@ -302,7 +307,7 @@ class FPGrowthModel private[ml] ( @Since("2.2.0") override def copy(extra: ParamMap): FPGrowthModel = { - val copied = new FPGrowthModel(uid, freqItemsets, itemSupport) + val copied = new FPGrowthModel(uid, freqItemsets, itemSupport, inputSize) copyValues(copied, extra).setParent(this.parent) } @@ -324,23 +329,10 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { class FPGrowthModelWriter(instance: FPGrowthModel) extends MLWriter { override protected def saveImpl(path: String): Unit = { - DefaultParamsWriter.saveMetadata(instance, path, sc) + val extraMetadata: JObject = Map("count" -> instance.inputSize) + DefaultParamsWriter.saveMetadata(instance, path, sc, extraMetadata = Some(extraMetadata)) val dataPath = new Path(path, "data").toString instance.freqItemsets.write.parquet(dataPath) - val itemDataType = instance.freqItemsets.schema(instance.getItemsCol).dataType match { - case ArrayType(et, _) => et - case other => throw new IllegalArgumentException( - s"Expected ${ArrayType.simpleString}, but got ${other.catalogString}.") - } - val itemSupportPath = new Path(path, "itemSupport").toString - val itemSupportRows = instance.itemSupport.map { - case (item, support) => Row(item, support) - }.toSeq - val schema = StructType(Seq( - StructField("item", itemDataType, nullable = false), - StructField("support", DoubleType, nullable = false))) - sparkSession.createDataFrame(sc.parallelize(itemSupportRows), schema) - .repartition(1).write.parquet(itemSupportPath) } } @@ -350,19 +342,17 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { private val className = classOf[FPGrowthModel].getName override def load(path: String): FPGrowthModel = { + implicit val format = DefaultFormats val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + val inputCount = (metadata.metadata \ "count").extract[Long] val dataPath = new Path(path, "data").toString val frequentItems = sparkSession.read.parquet(dataPath) - val itemSupportPath = new Path(path, "itemSupport") - val fs = FileSystem.get(sc.hadoopConfiguration) - val itemSupport = if (fs.exists(itemSupportPath)) { - sparkSession.read.parquet(itemSupportPath.toString).rdd.map { - case Row(item: Any, support: Double) => item -> support + val itemSupport = frequentItems.rdd.flatMap { + case Row(items: Seq[_], count: Long) if items.length == 1 => + Some(items.head -> count.toDouble / inputCount) + case _ => None }.collectAsMap() - } else { - Map.empty[Any, Double] - } - val model = new FPGrowthModel(metadata.uid, frequentItems, itemSupport) + val model = new FPGrowthModel(metadata.uid, frequentItems, itemSupport, inputCount) metadata.getAndSetParams(model) model } From 88eb571b732d42138b029ead106f4c8718e1e220 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 27 Aug 2018 23:33:18 +0200 Subject: [PATCH 6/9] fix R failure --- R/pkg/tests/fulltests/test_mllib_fpm.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R b/R/pkg/tests/fulltests/test_mllib_fpm.R index 69dda52f0c279..d80f66a25de1c 100644 --- a/R/pkg/tests/fulltests/test_mllib_fpm.R +++ b/R/pkg/tests/fulltests/test_mllib_fpm.R @@ -44,7 +44,8 @@ test_that("spark.fpGrowth", { expected_association_rules <- data.frame( antecedent = I(list(list("2"), list("3"))), consequent = I(list(list("1"), list("1"))), - confidence = c(1, 1) + confidence = c(1, 1), + lift = c(1, 1) ) expect_equivalent(expected_association_rules, collect(spark.associationRules(model))) From 44a002103aaf89a6b17688e50ee351a1576389d5 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 28 Aug 2018 10:56:22 +0200 Subject: [PATCH 7/9] improve naming + old version read support --- .../org/apache/spark/ml/fpm/FPGrowth.scala | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index ea53b839ba222..103e7146446ab 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.VersionUtils /** * Common params for FPGrowth and FPGrowthModel @@ -223,7 +224,7 @@ class FPGrowthModel private[ml] ( @Since("2.2.0") override val uid: String, @Since("2.2.0") @transient val freqItemsets: DataFrame, private val itemSupport: scala.collection.Map[Any, Double], - private val inputSize: Long) + private val numTrainingRecords: Long) extends Model[FPGrowthModel] with FPGrowthParams with MLWritable { /** @group setParam */ @@ -307,7 +308,7 @@ class FPGrowthModel private[ml] ( @Since("2.2.0") override def copy(extra: ParamMap): FPGrowthModel = { - val copied = new FPGrowthModel(uid, freqItemsets, itemSupport, inputSize) + val copied = new FPGrowthModel(uid, freqItemsets, itemSupport, numTrainingRecords) copyValues(copied, extra).setParent(this.parent) } @@ -329,7 +330,7 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { class FPGrowthModelWriter(instance: FPGrowthModel) extends MLWriter { override protected def saveImpl(path: String): Unit = { - val extraMetadata: JObject = Map("count" -> instance.inputSize) + val extraMetadata: JObject = Map("numTrainingRecords" -> instance.numTrainingRecords) DefaultParamsWriter.saveMetadata(instance, path, sc, extraMetadata = Some(extraMetadata)) val dataPath = new Path(path, "data").toString instance.freqItemsets.write.parquet(dataPath) @@ -344,15 +345,26 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { override def load(path: String): FPGrowthModel = { implicit val format = DefaultFormats val metadata = DefaultParamsReader.loadMetadata(path, sc, className) - val inputCount = (metadata.metadata \ "count").extract[Long] + val (major, minor) = VersionUtils.majorMinorVersion(metadata.sparkVersion) + val numTrainingRecords = if (major.toInt < 2 || (major.toInt == 2 && minor.toInt < 4)) { + // 2.3 and before don't store the count + 0L + } else { + // 2.4+ + (metadata.metadata \ "numTrainingRecords").extract[Long] + } val dataPath = new Path(path, "data").toString val frequentItems = sparkSession.read.parquet(dataPath) - val itemSupport = frequentItems.rdd.flatMap { - case Row(items: Seq[_], count: Long) if items.length == 1 => - Some(items.head -> count.toDouble / inputCount) - case _ => None - }.collectAsMap() - val model = new FPGrowthModel(metadata.uid, frequentItems, itemSupport, inputCount) + val itemSupport = if (numTrainingRecords == 0L) { + Map.empty[Any, Double] + } else { + frequentItems.rdd.flatMap { + case Row(items: Seq[_], count: Long) if items.length == 1 => + Some(items.head -> count.toDouble / numTrainingRecords) + case _ => None + }.collectAsMap() + } + val model = new FPGrowthModel(metadata.uid, frequentItems, itemSupport, numTrainingRecords) metadata.getAndSetParams(model) model } From 706303f023b4353b2bcd76148af43ab39ba9df4d Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 29 Aug 2018 09:41:31 +0200 Subject: [PATCH 8/9] address comments --- .../scala/org/apache/spark/mllib/fpm/AssociationRules.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala index 876774888608a..16324ff74cf0e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala @@ -133,7 +133,7 @@ object AssociationRules { * */ @Since("1.5.0") - def confidence: Double = freqUnion.toDouble / freqAntecedent + def confidence: Double = freqUnion / freqAntecedent @Since("2.4.0") def lift: Option[Double] = freqConsequent.map(fCons => confidence / fCons) From 2407e05d4bfbbe71359cf5a57a856ab5514998cb Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 31 Aug 2018 16:22:55 +0200 Subject: [PATCH 9/9] update docs in scala, python, R --- R/pkg/R/mllib_fpm.R | 5 +++-- .../main/scala/org/apache/spark/ml/fpm/FPGrowth.scala | 10 +++++----- .../org/apache/spark/mllib/fpm/AssociationRules.scala | 3 +++ python/pyspark/ml/fpm.py | 3 ++- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/R/pkg/R/mllib_fpm.R b/R/pkg/R/mllib_fpm.R index e2394906d8012..4ad34fe82328f 100644 --- a/R/pkg/R/mllib_fpm.R +++ b/R/pkg/R/mllib_fpm.R @@ -116,10 +116,11 @@ setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"), # Get association rules. #' @return A \code{SparkDataFrame} with association rules. -#' The \code{SparkDataFrame} contains three columns: +#' The \code{SparkDataFrame} contains four columns: #' \code{antecedent} (an array of the same type as the input column), #' \code{consequent} (an array of the same type as the input column), -#' and \code{condfidence} (confidence). +#' \code{condfidence} (confidence for the rule) +#' and \code{lift} (lift for the rule) #' @rdname spark.fpGrowth #' @aliases associationRules,FPGrowthModel-method #' @note spark.associationRules(FPGrowthModel) since 2.2.0 diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 103e7146446ab..840a89b76d26b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -248,9 +248,9 @@ class FPGrowthModel private[ml] ( @transient private var _cachedRules: DataFrame = _ /** - * Get association rules fitted using the minConfidence. Returns a dataframe - * with three fields, "antecedent", "consequent" and "confidence", where "antecedent" and - * "consequent" are Array[T] and "confidence" is Double. + * Get association rules fitted using the minConfidence. Returns a dataframe with four fields, + * "antecedent", "consequent", "confidence" and "lift", where "antecedent" and "consequent" are + * Array[T], whereas "confidence" and "lift" are Double. */ @Since("2.2.0") @transient def associationRules: DataFrame = { @@ -381,8 +381,8 @@ private[fpm] object AssociationRules { * @param freqCol column name for appearance count of the frequent itemsets * @param minConfidence minimum confidence for generating the association rules * @param itemSupport map containing an item and its support - * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double]) - * containing the association rules. + * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double], + * "lift" [Double]) containing the association rules. */ def getAssociationRulesFromFP[T: ClassTag]( dataset: Dataset[_], diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala index 16324ff74cf0e..43d256bbc46c3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala @@ -135,6 +135,9 @@ object AssociationRules { @Since("1.5.0") def confidence: Double = freqUnion / freqAntecedent + /** + * Returns the lift of the rule. + */ @Since("2.4.0") def lift: Option[Double] = freqConsequent.map(fCons => confidence / fCons) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index f9394421e0cc4..c2b29b73460ff 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -145,10 +145,11 @@ def freqItemsets(self): @since("2.2.0") def associationRules(self): """ - DataFrame with three columns: + DataFrame with four columns: * `antecedent` - Array of the same type as the input column. * `consequent` - Array of the same type as the input column. * `confidence` - Confidence for the rule (`DoubleType`). + * `lift` - Lift for the rule (`DoubleType`). """ return self._call_java("associationRules")