From e0686a7f2226451d7d7858fdc4af4d246e08dbb0 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Mon, 13 Jul 2015 19:07:47 -0700 Subject: [PATCH 1/6] cleaned up 2 sample kolmogorov test implementation to include feedback relevant from 1-sample test --- docs/mllib-statistics.md | 16 ++- .../apache/spark/mllib/stat/Statistics.scala | 14 +++ .../stat/test/KolmogorovSmirnovTest.scala | 89 ++++++++++++++ .../mllib/stat/HypothesisTestSuite.scala | 111 ++++++++++++++++++ 4 files changed, 228 insertions(+), 2 deletions(-) diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md index de5d6485f9b5f..db8098280fe5f 100644 --- a/docs/mllib-statistics.md +++ b/docs/mllib-statistics.md @@ -431,11 +431,16 @@ user tests against the normal distribution (`distName="norm"`), but does not pro parameters, the test initializes to the standard normal distribution and logs an appropriate message. +There is also a 2-sample, 2-sided implementation available, which tests if the 2 samples are drawn +from the same distribution. It is worth noting that the test assumes that all elements +are unique, both within and across the 2 samples, and thus no ranking ties should occur. +Given that the test is for continuous distributions this should not be an onerous requirement. +
[`Statistics`](api/scala/index.html#org.apache.spark.mllib.stat.Statistics$) provides methods to -run a 1-sample, 2-sided Kolmogorov-Smirnov test. The following example demonstrates how to run -and interpret the hypothesis tests. +run a 1-sample, 2-sided Kolmogorov-Smirnov test and a 2-sample, 2-sided Kolmogorv-Smirnov test. +The following example demonstrates how to run and interpret the hypothesis tests. {% highlight scala %} import org.apache.spark.SparkContext @@ -452,6 +457,13 @@ println(testResult) // summary of the test including the p-value, test statistic // perform a KS test using a cumulative distribution function of our making val myCDF: Double => Double = ... val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF) + +val data2: RDD[Double] = ... // another RDD of sample data +// run a KS test for data vs data 2 +// this corresponds to a 2-sample test +// the statistic provides a test for the null hypothesis that both samples are drawn from the +// same distribution +val ksTestResult2 = Statistics.kolmogorovSmirnovTest2(data, data2) {% endhighlight %}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 90332028cfb3a..50f38d7c3afcc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -196,4 +196,18 @@ object Statistics { : KolmogorovSmirnovTestResult = { KolmogorovSmirnovTest.testOneSample(data, distName, params: _*) } + + /** + * Perform a two-sample, two-sided Kolmogorov-Smirnov test for probability distribution equality + * The null hypothesis corresponds to both samples coming from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] object containing test + * statistic, p-value, and null hypothesis + */ + def kolmogorovSmirnovTest2(data1: RDD[Double], data2: RDD[Double]) + : KolmogorovSmirnovTestResult = { + KolmogorovSmirnovTest.testTwoSamples(data1, data2) + } + } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala index d89b0059d83f3..83d76f914bd02 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala @@ -53,6 +53,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging { object NullHypothesis extends Enumeration { type NullHypothesis = Value val OneSampleTwoSided = Value("Sample follows theoretical distribution") + val TwoSampleTwoSided = Value("Both samples follow the same distribution.") } /** @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + * statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { + val n1 = data1.count().toDouble + val n2 = data2.count().toDouble + val isSample1 = true // identifier for sample 1, needed after co-sort + // combine identified samples + val joinedData = data1.map(x => (x, isSample1)) ++ data2.map(x => (x, !isSample1)) + // co-sort and operate on each partition + val localData = joinedData.sortBy { case (v, id) => v }.mapPartitions { part => + searchTwoSampleCandidates(part, n1, n2) // local extrema + }.collect() + val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme + evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + * each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + * distance , the second is an unadjusted maximum distance (both of which will later + * be adjusted by a constant to account for elements in prior partitions), and a + * count corresponding to the numerator of the adjustment constant coming from this + * partition + */ + private def searchTwoSampleCandidates( + partData: Iterator[(Double, Boolean)], + n1: Double, + n2: Double) + : Iterator[(Double, Double, Double)] = { + // fold accumulator: local minimum, local maximum, index for sample 1, index for sample2 + case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int) + val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0) + // traverse the data in partition and calculate distances and counts + val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, isSample1)) => + val (add1, add2) = if (isSample1) (1, 0) else (0, 1) + val cdf1 = (acc.ix1 + add1) / n1 + val cdf2 = (acc.ix2 + add2) / n2 + val dist = cdf1 - cdf2 + KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + add1, acc.ix2 + add2) + } + val results = if (pResults == initAcc) { + Array[(Double, Double, Double)]() + } else { + Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 - (pResults.ix2 + 1) * n1)) + } + results.iterator + } + + /** + * Adjust candidate extremes by the appropriate constant. The resulting maximum corresponds to + * the two-sample, two-sided Kolmogorov-Smirnov test + * @param localData `Array[(Double, Double, Double)]` contains the candidate extremes from each + * partition, along with the numerator for the necessary constant adjustments + * @param n `Double` The denominator in the constant adjustment (i.e. (size of sample 1 ) * (size + * of sample 2)) + * @return The two-sample, two-sided Kolmogorov-Smirnov statistic + */ + private def searchTwoSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) + : Double = { + val initAcc = (Double.MinValue, 0.0) // maximum distance and numerator for constant adjustment + // adjust differences based on the number of elements preceding it, which should provide + // the correct distance between the 2 empirical CDFs + val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) => + val adjConst = prevCt / n + val dist1 = math.abs(minCand + adjConst) + val dist2 = math.abs(maxCand + adjConst) + val maxVal = Array(prevMax, dist1, dist2).max + (maxVal, prevCt + ct) + } + results._1 + } + + private def evalTwoSampleP(ksStat: Double, n: Int, m: Int): KolmogorovSmirnovTestResult = { + val pval = new KolmogorovSmirnovTest().approximateP(ksStat, n, m) + new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.TwoSampleTwoSided.toString) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index 142b90e764a7c..2d821b8def238 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -254,4 +254,115 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { assert(rCompResult.statistic ~== rKSStat relTol 1e-4) assert(rCompResult.pValue ~== rKSPVal relTol 1e-4) } + + test("2 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") { + // Create theoretical distributions + val stdNormalDist = new NormalDistribution(0, 1) + val normalDist = new NormalDistribution(2, 3) + val expDist = new ExponentialDistribution(0.6) + + // create data samples and parallelize + val n = 10000 + // local copies + val sampledStdNorm1L = stdNormalDist.sample(n) + val sampledStdNorm2L = stdNormalDist.sample(n) + val sampledNormL = normalDist.sample(n) + val sampledExpL = expDist.sample(n) + // distributed + val sampledStdNorm1P = sc.parallelize(sampledStdNorm1L, 10) + val sampledStdNorm2P = sc.parallelize(sampledStdNorm2L, 10) + val sampledNormP = sc.parallelize(sampledNormL, 10) + val sampledExpP = sc.parallelize(sampledExpL, 10) + + // Use apache math commons local KS test for verify calculations + val ksTest = new KolmogorovSmirnovTest() + val pThreshold = 0.05 + + // Comparing 2 samples from same standard normal distribution + val result1 = Statistics.kolmogorovSmirnovTest2(sampledStdNorm1P, sampledStdNorm2P) + val refStat1 = ksTest.kolmogorovSmirnovStatistic(sampledStdNorm1L, sampledStdNorm2L) + val refP1 = ksTest.kolmogorovSmirnovTest(sampledStdNorm1L, sampledStdNorm2L) + assert(result1.statistic ~== refStat1 relTol 1e-4) + assert(result1.pValue ~== refP1 relTol 1e-4) + assert(result1.pValue > pThreshold) // accept H0 + + // Comparing 2 samples from different normal distributions + val result2 = Statistics.kolmogorovSmirnovTest2(sampledStdNorm1P, sampledNormP) + val refStat2 = ksTest.kolmogorovSmirnovStatistic(sampledStdNorm1L, sampledNormL) + val refP2 = ksTest.kolmogorovSmirnovTest(sampledStdNorm1L, sampledNormL) + assert(result2.statistic ~== refStat2 relTol 1e-4) + assert(result2.pValue ~== refP2 relTol 1e-4) + assert(result2.pValue < pThreshold) // reject H0 + + // Comparing 1 sample from normal distribution to 1 sample from exponential distribution + val result3 = Statistics.kolmogorovSmirnovTest2(sampledNormP, sampledExpP) + val refStat3 = ksTest.kolmogorovSmirnovStatistic(sampledNormL, sampledExpL) + val refP3 = ksTest.kolmogorovSmirnovTest(sampledNormL, sampledExpL) + assert(result3.statistic ~== refStat3 relTol 1e-4) + assert(result3.pValue ~== refP3 relTol 1e-4) + assert(result3.pValue < pThreshold) // reject H0 + + // Creating 2 samples that don't overlap, so we are guaranteed to have some partitions + // that only include values from sample 1 and some that only include values from sample 2 + val nonOverlap1L = (1 to n).toArray.map(_.toDouble) + val nonOverlap2L = (n + 1 to 2 * n).toArray.map(_.toDouble) + val nonOverlap1P = sc.parallelize(nonOverlap1L, 20) + val nonOverlap2P = sc.parallelize(nonOverlap2L, 20) + + val result4 = Statistics.kolmogorovSmirnovTest2(nonOverlap1P, nonOverlap2P) + val refStat4 = ksTest.kolmogorovSmirnovStatistic(nonOverlap1L, nonOverlap2L) + val refP4 = ksTest.kolmogorovSmirnovTest(nonOverlap1L, nonOverlap2L) + assert(result4.statistic ~== refStat4 relTol 1e-3) + assert(result4.pValue ~== refP4 relTol 1e-3) + assert(result4.pValue < pThreshold) // reject H0 + } + + test("2 sample Kolmogorov-Smirnov test: R implementation equivalence") { + /* + Comparing results with the R implementation of KS + > sessionInfo() + R version 3.2.0 (2015-04-16) + Platform: x86_64-apple-darwin13.4.0 (64-bit) + > set.seed(20) + > v <- rnorm(20) + > v2 <- rnorm(20) + > v + [1] 1.16268529 -0.58592447 1.78546500 -1.33259371 -0.44656677 0.56960612 + [7] -2.88971761 -0.86901834 -0.46170268 -0.55554091 -0.02013537 -0.15038222 + [13] -0.62812676 1.32322085 -1.52135057 -0.43742787 0.97057758 0.02822264 + [19] -0.08578219 0.38921440 + > v2 + [1] 0.23668737 -0.14444023 0.72222970 0.36990686 -0.24206631 -1.47206332 + [7] -0.59615955 -1.14670013 -2.47463643 -0.61350858 -0.21631151 1.59014577 + [13] 1.55614328 1.10845089 -1.09734184 -1.86060572 -0.91357885 1.24556891 + [19] 0.08785472 0.42348190 + */ + val rData1 = sc.parallelize( + Array( + 1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501, + -0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555, + -0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063, + -0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691, + 0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942 + ) + ) + + val rData2 = sc.parallelize( + Array( + 0.236687367712904, -0.144440226694072, 0.722229700806146, 0.369906857410192, + -0.242066314481781, -1.47206331842053, -0.596159545765696, -1.1467001312186, + -2.47463643305885, -0.613508578410268, -0.216311514038102, 1.5901457684867, + 1.55614327565194, 1.10845089348356, -1.09734184488477, -1.86060571637755, + -0.913578847977252, 1.24556891198713, 0.0878547183607045, 0.423481895050245 + ) + ) + + val rKSStat = 0.15 + val rKSPval = 0.9831 + val kSCompResult = Statistics.kolmogorovSmirnovTest2(rData1, rData2) + assert(kSCompResult.statistic ~== rKSStat relTol 1e-4) + // we're more lenient with the p-value here since the approximate p-value calculated + // by apache math commons is likely to be slightly off given the small sample size + assert(kSCompResult.pValue ~== rKSPval relTol 1e-2) + } } From 7a43f49cdf663d0f76b8131b173b0f4443e6b2db Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Mon, 20 Jul 2015 18:29:37 -0700 Subject: [PATCH 2/6] renamed kolmogorovSmirnov2 to kolmogorovSmirnov2Sample --- docs/mllib-statistics.md | 2 +- .../org/apache/spark/mllib/stat/Statistics.scala | 2 +- .../mllib/stat/test/KolmogorovSmirnovTest.scala | 2 +- .../spark/mllib/stat/HypothesisTestSuite.scala | 12 ++++++------ 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md index db8098280fe5f..cdfcf6b4d0f59 100644 --- a/docs/mllib-statistics.md +++ b/docs/mllib-statistics.md @@ -463,7 +463,7 @@ val data2: RDD[Double] = ... // another RDD of sample data // this corresponds to a 2-sample test // the statistic provides a test for the null hypothesis that both samples are drawn from the // same distribution -val ksTestResult2 = Statistics.kolmogorovSmirnovTest2(data, data2) +val ksTestResult2 = Statistics.kolmogorovSmirnovTest2Sample(data, data2) {% endhighlight %} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 50f38d7c3afcc..6e2dd9fb372d8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -205,7 +205,7 @@ object Statistics { * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] object containing test * statistic, p-value, and null hypothesis */ - def kolmogorovSmirnovTest2(data1: RDD[Double], data2: RDD[Double]) + def kolmogorovSmirnovTest2Sample(data1: RDD[Double], data2: RDD[Double]) : KolmogorovSmirnovTestResult = { KolmogorovSmirnovTest.testTwoSamples(data1, data2) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala index 83d76f914bd02..77212b030183a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala @@ -53,7 +53,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging { object NullHypothesis extends Enumeration { type NullHypothesis = Value val OneSampleTwoSided = Value("Sample follows theoretical distribution") - val TwoSampleTwoSided = Value("Both samples follow the same distribution.") + val TwoSampleTwoSided = Value("Both samples follow the same distribution") } /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index 2d821b8def238..cd80edec5f06b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -274,12 +274,12 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { val sampledNormP = sc.parallelize(sampledNormL, 10) val sampledExpP = sc.parallelize(sampledExpL, 10) - // Use apache math commons local KS test for verify calculations + // Use apache math commons local KS test to verify calculations val ksTest = new KolmogorovSmirnovTest() val pThreshold = 0.05 // Comparing 2 samples from same standard normal distribution - val result1 = Statistics.kolmogorovSmirnovTest2(sampledStdNorm1P, sampledStdNorm2P) + val result1 = Statistics.kolmogorovSmirnovTest2Sample(sampledStdNorm1P, sampledStdNorm2P) val refStat1 = ksTest.kolmogorovSmirnovStatistic(sampledStdNorm1L, sampledStdNorm2L) val refP1 = ksTest.kolmogorovSmirnovTest(sampledStdNorm1L, sampledStdNorm2L) assert(result1.statistic ~== refStat1 relTol 1e-4) @@ -287,7 +287,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { assert(result1.pValue > pThreshold) // accept H0 // Comparing 2 samples from different normal distributions - val result2 = Statistics.kolmogorovSmirnovTest2(sampledStdNorm1P, sampledNormP) + val result2 = Statistics.kolmogorovSmirnovTest2Sample(sampledStdNorm1P, sampledNormP) val refStat2 = ksTest.kolmogorovSmirnovStatistic(sampledStdNorm1L, sampledNormL) val refP2 = ksTest.kolmogorovSmirnovTest(sampledStdNorm1L, sampledNormL) assert(result2.statistic ~== refStat2 relTol 1e-4) @@ -295,7 +295,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { assert(result2.pValue < pThreshold) // reject H0 // Comparing 1 sample from normal distribution to 1 sample from exponential distribution - val result3 = Statistics.kolmogorovSmirnovTest2(sampledNormP, sampledExpP) + val result3 = Statistics.kolmogorovSmirnovTest2Sample(sampledNormP, sampledExpP) val refStat3 = ksTest.kolmogorovSmirnovStatistic(sampledNormL, sampledExpL) val refP3 = ksTest.kolmogorovSmirnovTest(sampledNormL, sampledExpL) assert(result3.statistic ~== refStat3 relTol 1e-4) @@ -309,7 +309,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { val nonOverlap1P = sc.parallelize(nonOverlap1L, 20) val nonOverlap2P = sc.parallelize(nonOverlap2L, 20) - val result4 = Statistics.kolmogorovSmirnovTest2(nonOverlap1P, nonOverlap2P) + val result4 = Statistics.kolmogorovSmirnovTest2Sample(nonOverlap1P, nonOverlap2P) val refStat4 = ksTest.kolmogorovSmirnovStatistic(nonOverlap1L, nonOverlap2L) val refP4 = ksTest.kolmogorovSmirnovTest(nonOverlap1L, nonOverlap2L) assert(result4.statistic ~== refStat4 relTol 1e-3) @@ -359,7 +359,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { val rKSStat = 0.15 val rKSPval = 0.9831 - val kSCompResult = Statistics.kolmogorovSmirnovTest2(rData1, rData2) + val kSCompResult = Statistics.kolmogorovSmirnovTest2Sample(rData1, rData2) assert(kSCompResult.statistic ~== rKSStat relTol 1e-4) // we're more lenient with the p-value here since the approximate p-value calculated // by apache math commons is likely to be slightly off given the small sample size From 88831fa86fe396c70eb01aa39a32dd699184a146 Mon Sep 17 00:00:00 2001 From: Jose Cambronero Date: Thu, 30 Jul 2015 21:11:36 -0700 Subject: [PATCH 3/6] incorporated latest feedback from @sryza --- docs/mllib-statistics.md | 6 +- .../apache/spark/mllib/stat/Statistics.scala | 2 +- .../stat/test/KolmogorovSmirnovTest.scala | 62 ++++++++++-------- .../mllib/stat/HypothesisTestSuite.scala | 63 ++++++++++++++----- 4 files changed, 90 insertions(+), 43 deletions(-) diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md index cdfcf6b4d0f59..969d2d3312a20 100644 --- a/docs/mllib-statistics.md +++ b/docs/mllib-statistics.md @@ -431,9 +431,9 @@ user tests against the normal distribution (`distName="norm"`), but does not pro parameters, the test initializes to the standard normal distribution and logs an appropriate message. -There is also a 2-sample, 2-sided implementation available, which tests if the 2 samples are drawn -from the same distribution. It is worth noting that the test assumes that all elements -are unique, both within and across the 2 samples, and thus no ranking ties should occur. +There is also a 2-sample, 2-sided implementation available, which tests the null hypothesis that the +2 samples are drawn from the same distribution. It is worth noting that the test assumes that all +elements are unique, both within and across the 2 samples, and thus no ranking ties should occur. Given that the test is for continuous distributions this should not be an onerous requirement.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 6e2dd9fb372d8..e06f9b969a664 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -199,7 +199,7 @@ object Statistics { /** * Perform a two-sample, two-sided Kolmogorov-Smirnov test for probability distribution equality - * The null hypothesis corresponds to both samples coming from the same distribution + * The null hypothesis is that both samples come from the same distribution * @param data1 `RDD[Double]` first sample of data * @param data2 `RDD[Double]` second sample of data * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] object containing test diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala index 77212b030183a..3013d7f391f35 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala @@ -203,50 +203,61 @@ private[stat] object KolmogorovSmirnovTest extends Logging { def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { val n1 = data1.count().toDouble val n2 = data2.count().toDouble - val isSample1 = true // identifier for sample 1, needed after co-sort + // identifier for sample 1, needed after co-sort + val isSample1 = true // combine identified samples - val joinedData = data1.map(x => (x, isSample1)) ++ data2.map(x => (x, !isSample1)) + val unionedData = data1.map(x => (x, isSample1)) ++ data2.map(x => (x, !isSample1)) // co-sort and operate on each partition - val localData = joinedData.sortBy { case (v, id) => v }.mapPartitions { part => - searchTwoSampleCandidates(part, n1, n2) // local extrema + val localData = unionedData.sortBy { case (v, id) => v }.mapPartitions { part => + // local extrema + searchTwoSampleCandidates(part, n1, n2) }.collect() - val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme + // result: global extreme + val ksStat = searchTwoSampleStatistic(localData, n1 * n2) evalTwoSampleP(ksStat, n1.toInt, n2.toInt) } /** - * Calculates maximum distance candidates and counts from each sample within one partition for - * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * Calculates maximum distance candidates and counts of elements from each sample within one + * partition for the two-sample, two-sided Kolmogorov-Smirnov test implementation * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, * each element is additionally tagged with a boolean flag for sample 1 membership * @param n1 `Double` sample 1 size * @param n2 `Double` sample 2 size * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum - * distance , the second is an unadjusted maximum distance (both of which will later - * be adjusted by a constant to account for elements in prior partitions), and a - * count corresponding to the numerator of the adjustment constant coming from this - * partition + * distance, the second is an unadjusted maximum distance (both of which will later + * be adjusted by a constant to account for elements in prior partitions), and the third is + * a count corresponding to the numerator of the adjustment constant coming from this + * partition. This last value, the numerator of the adjustment constant, is calculated as + * |sample 2| * |sample 1 in partition| - |sample 1| * |sample 2 in partition|. This comes + * from the fact that when we adjust for prior partitions, what we are doing is + * adding the difference of the fractions (|prior elements in sample 1| / |sample 1| - + * |prior elements in sample 2| / |sample 2|). We simply keep track of the numerator + * portion that is attributable to each partition so that following partitions can + * use it to cumulatively adjust their values. */ private def searchTwoSampleCandidates( partData: Iterator[(Double, Boolean)], n1: Double, - n2: Double) - : Iterator[(Double, Double, Double)] = { + n2: Double): Iterator[(Double, Double, Double)] = { // fold accumulator: local minimum, local maximum, index for sample 1, index for sample2 - case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int) - val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0) - // traverse the data in partition and calculate distances and counts - val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, isSample1)) => + case class ExtremaAndIndices(min: Double, max: Double, ix1: Int, ix2: Int) + val initAcc = ExtremaAndIndices(Double.MaxValue, Double.MinValue, 0, 0) + // traverse the data in the partition and calculate distances and counts + val pResults = partData.foldLeft(initAcc) { case (acc: ExtremaAndIndices, (v, isSample1)) => val (add1, add2) = if (isSample1) (1, 0) else (0, 1) val cdf1 = (acc.ix1 + add1) / n1 val cdf2 = (acc.ix2 + add2) / n2 val dist = cdf1 - cdf2 - KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + add1, acc.ix2 + add2) + ExtremaAndIndices( + math.min(acc.min, dist), + math.max(acc.max, dist), + acc.ix1 + add1, acc.ix2 + add2) } val results = if (pResults == initAcc) { Array[(Double, Double, Double)]() } else { - Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 - (pResults.ix2 + 1) * n1)) + Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 - (pResults.ix2 + 1) * n1)) } results.iterator } @@ -262,15 +273,16 @@ private[stat] object KolmogorovSmirnovTest extends Logging { */ private def searchTwoSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) : Double = { - val initAcc = (Double.MinValue, 0.0) // maximum distance and numerator for constant adjustment + // maximum distance and numerator for constant adjustment + val initAcc = (Double.MinValue, 0.0) // adjust differences based on the number of elements preceding it, which should provide // the correct distance between the 2 empirical CDFs val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) => - val adjConst = prevCt / n - val dist1 = math.abs(minCand + adjConst) - val dist2 = math.abs(maxCand + adjConst) - val maxVal = Array(prevMax, dist1, dist2).max - (maxVal, prevCt + ct) + val adjConst = prevCt / n + val dist1 = math.abs(minCand + adjConst) + val dist2 = math.abs(maxCand + adjConst) + val maxVal = Array(prevMax, dist1, dist2).max + (maxVal, prevCt + ct) } results._1 } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index cd80edec5f06b..caa4ac14328d4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -301,20 +301,6 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { assert(result3.statistic ~== refStat3 relTol 1e-4) assert(result3.pValue ~== refP3 relTol 1e-4) assert(result3.pValue < pThreshold) // reject H0 - - // Creating 2 samples that don't overlap, so we are guaranteed to have some partitions - // that only include values from sample 1 and some that only include values from sample 2 - val nonOverlap1L = (1 to n).toArray.map(_.toDouble) - val nonOverlap2L = (n + 1 to 2 * n).toArray.map(_.toDouble) - val nonOverlap1P = sc.parallelize(nonOverlap1L, 20) - val nonOverlap2P = sc.parallelize(nonOverlap2L, 20) - - val result4 = Statistics.kolmogorovSmirnovTest2Sample(nonOverlap1P, nonOverlap2P) - val refStat4 = ksTest.kolmogorovSmirnovStatistic(nonOverlap1L, nonOverlap2L) - val refP4 = ksTest.kolmogorovSmirnovTest(nonOverlap1L, nonOverlap2L) - assert(result4.statistic ~== refStat4 relTol 1e-3) - assert(result4.pValue ~== refP4 relTol 1e-3) - assert(result4.pValue < pThreshold) // reject H0 } test("2 sample Kolmogorov-Smirnov test: R implementation equivalence") { @@ -365,4 +351,53 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { // by apache math commons is likely to be slightly off given the small sample size assert(kSCompResult.pValue ~== rKSPval relTol 1e-2) } + + test("2 sample Kolmogorov-Smirnov test: partitions with no data") { + // we use the R data provided in the prior test + // We request a number of partitions larger than the number of elements in the data sets + // wich + val rData1 = sc.parallelize( + Array( + 1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501, + -0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555, + -0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063, + -0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691, + 0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942 + ), 40) + + val rData2 = sc.parallelize( + Array( + 0.236687367712904, -0.144440226694072, 0.722229700806146, 0.369906857410192, + -0.242066314481781, -1.47206331842053, -0.596159545765696, -1.1467001312186, + -2.47463643305885, -0.613508578410268, -0.216311514038102, 1.5901457684867, + 1.55614327565194, 1.10845089348356, -1.09734184488477, -1.86060571637755, + -0.913578847977252, 1.24556891198713, 0.0878547183607045, 0.423481895050245 + ), 40) + + val rKSStat = 0.15 + val rKSPval = 0.9831 + val kSCompResult = Statistics.kolmogorovSmirnovTest2Sample(rData1, rData2) + assert(kSCompResult.statistic ~== rKSStat relTol 1e-4) + } + + test("2 sample Kolmogorov-Smirnov test: partitions with just data from one sample") { + // Creating 2 samples that don't overlap, so we are guaranteed to have some partitions + // that only include values from sample 1 and some that only include values from sample 2 + val n = 1000 + val nonOverlap1L = (1 to n).toArray.map(_.toDouble) + val nonOverlap2L = (n + 1 to 2 * n).toArray.map(_.toDouble) + val nonOverlap1P = sc.parallelize(nonOverlap1L, 20) + val nonOverlap2P = sc.parallelize(nonOverlap2L, 20) + + // Use apache math commons local KS test to verify calculations + val ksTest = new KolmogorovSmirnovTest() + val pThreshold = 0.05 + + val result4 = Statistics.kolmogorovSmirnovTest2Sample(nonOverlap1P, nonOverlap2P) + val refStat4 = ksTest.kolmogorovSmirnovStatistic(nonOverlap1L, nonOverlap2L) + val refP4 = ksTest.kolmogorovSmirnovTest(nonOverlap1L, nonOverlap2L) + assert(result4.statistic ~== refStat4 relTol 1e-3) + assert(result4.pValue ~== refP4 relTol 1e-3) + assert(result4.pValue < pThreshold) // reject H0 + } } From 748e9d308e2864c8a99ccf5ec3cfed440a559235 Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Fri, 31 Jul 2015 14:27:23 -0700 Subject: [PATCH 4/6] incorporated PR feedback from @syrza and @srowen. Merged in upstream to account for aliasing of commons' KS test --- .../mllib/stat/test/KolmogorovSmirnovTest.scala | 15 +++++++-------- .../spark/mllib/stat/HypothesisTestSuite.scala | 8 ++++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala index ea6b644efc73f..e1bf2a58a4589 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala @@ -206,12 +206,11 @@ private[stat] object KolmogorovSmirnovTest extends Logging { // identifier for sample 1, needed after co-sort val isSample1 = true // combine identified samples - val unionedData = data1.map(x => (x, isSample1)) ++ data2.map(x => (x, !isSample1)) - // co-sort and operate on each partition - val localData = unionedData.sortBy { case (v, id) => v }.mapPartitions { part => - // local extrema - searchTwoSampleCandidates(part, n1, n2) - }.collect() + val unionedData = data1.map((_, isSample1)).union(data2.map((_, !isSample1))) + // co-sort and operate on each partition, returning local extrema to the driver + val localData = unionedData.sortByKey().mapPartitions( + searchTwoSampleCandidates(_, n1, n2) + ).collect() // result: global extreme val ksStat = searchTwoSampleStatistic(localData, n1 * n2) evalTwoSampleP(ksStat, n1.toInt, n2.toInt) @@ -244,7 +243,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging { case class ExtremaAndIndices(min: Double, max: Double, ix1: Int, ix2: Int) val initAcc = ExtremaAndIndices(Double.MaxValue, Double.MinValue, 0, 0) // traverse the data in the partition and calculate distances and counts - val pResults = partData.foldLeft(initAcc) { case (acc: ExtremaAndIndices, (v, isSample1)) => + val pResults = partData.foldLeft(initAcc) { case (acc, (v, isSample1)) => val (add1, add2) = if (isSample1) (1, 0) else (0, 1) val cdf1 = (acc.ix1 + add1) / n1 val cdf2 = (acc.ix2 + add2) / n2 @@ -288,7 +287,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging { } private def evalTwoSampleP(ksStat: Double, n: Int, m: Int): KolmogorovSmirnovTestResult = { - val pval = new KolmogorovSmirnovTest().approximateP(ksStat, n, m) + val pval = new CommonMathKolmogorovSmirnovTest().approximateP(ksStat, n, m) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.TwoSampleTwoSided.toString) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index caa4ac14328d4..304feafa25977 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -21,7 +21,7 @@ import java.util.Random import org.apache.commons.math3.distribution.{ExponentialDistribution, NormalDistribution, UniformRealDistribution} -import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest +import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => CommonMathKolmogorovSmirnovTest} import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors} @@ -177,7 +177,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { val sampledUnif = sc.parallelize(unifDist.sample(n), 10) // Use a apache math commons local KS test to verify calculations - val ksTest = new KolmogorovSmirnovTest() + val ksTest = new CommonMathKolmogorovSmirnovTest() val pThreshold = 0.05 // Comparing a standard normal sample to a standard normal distribution @@ -275,7 +275,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { val sampledExpP = sc.parallelize(sampledExpL, 10) // Use apache math commons local KS test to verify calculations - val ksTest = new KolmogorovSmirnovTest() + val ksTest = new CommonMathKolmogorovSmirnovTest() val pThreshold = 0.05 // Comparing 2 samples from same standard normal distribution @@ -390,7 +390,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { val nonOverlap2P = sc.parallelize(nonOverlap2L, 20) // Use apache math commons local KS test to verify calculations - val ksTest = new KolmogorovSmirnovTest() + val ksTest = new CommonMathKolmogorovSmirnovTest() val pThreshold = 0.05 val result4 = Statistics.kolmogorovSmirnovTest2Sample(nonOverlap1P, nonOverlap2P) From 16ba96e00c5886df187467541c5189dce1bf92dc Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Mon, 3 Aug 2015 19:10:00 -0700 Subject: [PATCH 5/6] further PR feedback from @sryza. Modified tests now that certain critical functions are package private and can be tested more directly --- .../stat/test/KolmogorovSmirnovTest.scala | 37 +++++++--- .../mllib/stat/HypothesisTestSuite.scala | 71 +++++++++++-------- 2 files changed, 70 insertions(+), 38 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala index e1bf2a58a4589..fb0ce56dccc6f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala @@ -46,13 +46,25 @@ import org.apache.spark.rdd.RDD * partition, we can collect and operate locally. Locally, we can now adjust each distance by the * appropriate constant (the cumulative sum of number of elements in the prior partitions divided by * thedata set size). Finally, we take the maximum absolute value, and this is the statistic. + * + * In the case of the 2-sample variant, the approach is slightly different. We calculate 2 + * empirical CDFs corresponding to the distribution under sample 1 and under sample 2. Within each + * partition, we can calculate the maximum difference of the local empirical CDFs, which is off from + * the global value by some constant. Similarly to the 1-sample variant, we can simply adjust this + * difference once we have collected the possible candidate extrema. However, in this case we don't + * collect the number of elements in a partition, but rather an adjustment constant, that we can + * cumulatively sum once we've collected results on the driver, and that when divided by + * |sample 1| * |sample 2| provides the adjustment necessary to the difference between the 2 + * empirical CDFs in a given partition and thus the adjustment necessary to the potential extrema + * candidates. The constant that we collect per partition thus corresponds to + * |sample 2| * |sample 1 in partition| - |sample 1| * |sample 2 in partition|. */ private[stat] object KolmogorovSmirnovTest extends Logging { // Null hypothesis for the type of KS test to be included in the result. object NullHypothesis extends Enumeration { type NullHypothesis = Value - val OneSampleTwoSided = Value("Sample follows theoretical distribution") + val OneSampleTwoSided = Value("Sample follows the theoretical distribution") val TwoSampleTwoSided = Value("Both samples follow the same distribution") } @@ -218,7 +230,8 @@ private[stat] object KolmogorovSmirnovTest extends Logging { /** * Calculates maximum distance candidates and counts of elements from each sample within one - * partition for the two-sample, two-sided Kolmogorov-Smirnov test implementation + * partition for the two-sample, two-sided Kolmogorov-Smirnov test implementation. Function + * is package private for testing convenience. * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, * each element is additionally tagged with a boolean flag for sample 1 membership * @param n1 `Double` sample 1 size @@ -235,24 +248,27 @@ private[stat] object KolmogorovSmirnovTest extends Logging { * portion that is attributable to each partition so that following partitions can * use it to cumulatively adjust their values. */ - private def searchTwoSampleCandidates( + private[stat] def searchTwoSampleCandidates( partData: Iterator[(Double, Boolean)], n1: Double, n2: Double): Iterator[(Double, Double, Double)] = { // fold accumulator: local minimum, local maximum, index for sample 1, index for sample2 - case class ExtremaAndIndices(min: Double, max: Double, ix1: Int, ix2: Int) - val initAcc = ExtremaAndIndices(Double.MaxValue, Double.MinValue, 0, 0) + case class ExtremaAndRunningIndices(min: Double, max: Double, ix1: Int, ix2: Int) + val initAcc = ExtremaAndRunningIndices(Double.MaxValue, Double.MinValue, 0, 0) // traverse the data in the partition and calculate distances and counts val pResults = partData.foldLeft(initAcc) { case (acc, (v, isSample1)) => val (add1, add2) = if (isSample1) (1, 0) else (0, 1) val cdf1 = (acc.ix1 + add1) / n1 val cdf2 = (acc.ix2 + add2) / n2 val dist = cdf1 - cdf2 - ExtremaAndIndices( + ExtremaAndRunningIndices( math.min(acc.min, dist), math.max(acc.max, dist), - acc.ix1 + add1, acc.ix2 + add2) + acc.ix1 + add1, acc.ix2 + add2 + ) } + // If partition has no data, then pResults will match the fold accumulator + // we must filter this out to avoid having the statistic spoiled by the accumulation values val results = if (pResults == initAcc) { Array[(Double, Double, Double)]() } else { @@ -263,14 +279,15 @@ private[stat] object KolmogorovSmirnovTest extends Logging { /** * Adjust candidate extremes by the appropriate constant. The resulting maximum corresponds to - * the two-sample, two-sided Kolmogorov-Smirnov test + * the two-sample, two-sided Kolmogorov-Smirnov test. Function is package private for testing + * convenience. * @param localData `Array[(Double, Double, Double)]` contains the candidate extremes from each * partition, along with the numerator for the necessary constant adjustments * @param n `Double` The denominator in the constant adjustment (i.e. (size of sample 1 ) * (size * of sample 2)) * @return The two-sample, two-sided Kolmogorov-Smirnov statistic */ - private def searchTwoSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) + private[stat] def searchTwoSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) : Double = { // maximum distance and numerator for constant adjustment val initAcc = (Double.MinValue, 0.0) @@ -282,7 +299,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val dist2 = math.abs(maxCand + adjConst) val maxVal = Array(prevMax, dist1, dist2).max (maxVal, prevCt + ct) - } + } results._1 } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index 304feafa25977..f7ef843d600fa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -26,7 +26,7 @@ import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => CommonM import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors} import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.stat.test.ChiSqTest +import org.apache.spark.mllib.stat.test.{ChiSqTest, KolmogorovSmirnovTest} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -352,52 +352,67 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { assert(kSCompResult.pValue ~== rKSPval relTol 1e-2) } - test("2 sample Kolmogorov-Smirnov test: partitions with no data") { + test("2 sample Kolmogorov-Smirnov test: helper functions in case partitions have no data") { // we use the R data provided in the prior test - // We request a number of partitions larger than the number of elements in the data sets - // wich - val rData1 = sc.parallelize( - Array( + // Once we have combined and sorted we partitino with a larger number than + // the number of elements to guarantee we have empty partitions. + // We test various critical package private functions in this circumstance. + val rData1 = Array( 1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501, -0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555, -0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063, -0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691, 0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942 - ), 40) + ) - val rData2 = sc.parallelize( - Array( + val rData2 = Array( 0.236687367712904, -0.144440226694072, 0.722229700806146, 0.369906857410192, -0.242066314481781, -1.47206331842053, -0.596159545765696, -1.1467001312186, -2.47463643305885, -0.613508578410268, -0.216311514038102, 1.5901457684867, 1.55614327565194, 1.10845089348356, -1.09734184488477, -1.86060571637755, -0.913578847977252, 1.24556891198713, 0.0878547183607045, 0.423481895050245 - ), 40) + ) + + + val n1 = rData1.length + val n2 = rData2.length + val unioned = (rData1.map((_, true)) ++ rData2.map((_, false))).sortBy(_._1) + val parallel = sc.parallelize(unioned, 100) + // verify that there are empty partitions + assert(parallel.mapPartitions(x => Array(x.size).iterator).collect().contains(0)) + val localExtrema = parallel.mapPartitions( + KolmogorovSmirnovTest.searchTwoSampleCandidates(_, n1, n2) + ).collect() + val ksCompStat = KolmogorovSmirnovTest.searchTwoSampleStatistic(localExtrema, n1 * n2) val rKSStat = 0.15 - val rKSPval = 0.9831 - val kSCompResult = Statistics.kolmogorovSmirnovTest2Sample(rData1, rData2) - assert(kSCompResult.statistic ~== rKSStat relTol 1e-4) + assert(ksCompStat ~== rKSStat relTol 1e-4) } - test("2 sample Kolmogorov-Smirnov test: partitions with just data from one sample") { - // Creating 2 samples that don't overlap, so we are guaranteed to have some partitions - // that only include values from sample 1 and some that only include values from sample 2 - val n = 1000 - val nonOverlap1L = (1 to n).toArray.map(_.toDouble) - val nonOverlap2L = (n + 1 to 2 * n).toArray.map(_.toDouble) - val nonOverlap1P = sc.parallelize(nonOverlap1L, 20) - val nonOverlap2P = sc.parallelize(nonOverlap2L, 20) + test("2 sample Kolmogorov-Smirnov test: helper functions in case partitions have only 1 sample") { + // Creating 2 samples that don't overlap and request a large number of partitions to guarantee + // that there will be partitions with only data from 1 sample. We test critical helper + // functions in these circumstances. + val n = 100 + val lower = (1 to n).toArray.map(_.toDouble) + val upper = (1 to n).toArray.map(n + _.toDouble * 100) + + val unioned = (lower.map((_, true)) ++ upper.map((_, false))).sortBy(_._1) + val parallel = sc.parallelize(unioned, 200) + // verify that there is at least 1 partition with only 1 sample + assert(parallel.mapPartitions(x => + Array(x.toArray.map(_._1).distinct.length).iterator + ).collect().contains(1) + ) + val localExtrema = parallel.mapPartitions( + KolmogorovSmirnovTest.searchTwoSampleCandidates(_, n, n) + ).collect() + val ksCompStat = KolmogorovSmirnovTest.searchTwoSampleStatistic(localExtrema, n * n) // Use apache math commons local KS test to verify calculations val ksTest = new CommonMathKolmogorovSmirnovTest() - val pThreshold = 0.05 - val result4 = Statistics.kolmogorovSmirnovTest2Sample(nonOverlap1P, nonOverlap2P) - val refStat4 = ksTest.kolmogorovSmirnovStatistic(nonOverlap1L, nonOverlap2L) - val refP4 = ksTest.kolmogorovSmirnovTest(nonOverlap1L, nonOverlap2L) - assert(result4.statistic ~== refStat4 relTol 1e-3) - assert(result4.pValue ~== refP4 relTol 1e-3) - assert(result4.pValue < pThreshold) // reject H0 + val refStat4 = ksTest.kolmogorovSmirnovStatistic(lower, upper) + assert(ksCompStat ~== refStat4 relTol 1e-3) } } From 6a2768291c3ae5d6f073d6f79b6f2e05fbbbab5c Mon Sep 17 00:00:00 2001 From: "jose.cambronero" Date: Wed, 5 Aug 2015 10:55:50 -0700 Subject: [PATCH 6/6] minor grammar fix in TwoSampleTwoSided Null hypothesis, fixes pyspark unit test failure by reversing prior grammar change --- .../apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala index fb0ce56dccc6f..8e9652eb4466a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala @@ -64,8 +64,8 @@ private[stat] object KolmogorovSmirnovTest extends Logging { // Null hypothesis for the type of KS test to be included in the result. object NullHypothesis extends Enumeration { type NullHypothesis = Value - val OneSampleTwoSided = Value("Sample follows the theoretical distribution") - val TwoSampleTwoSided = Value("Both samples follow the same distribution") + val OneSampleTwoSided = Value("Sample follows theoretical distribution") + val TwoSampleTwoSided = Value("Both samples follow same distribution") } /**