From 491601600a09de1f7f3b46b7459e320d4933cb08 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Mon, 10 Mar 2014 23:58:32 -0700 Subject: [PATCH 1/6] added histogram method, added max and min to statscounter --- python/pyspark/rdd.py | 56 +++++++++++++++++++++++++++++++++++ python/pyspark/statcounter.py | 23 ++++++++++++-- 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index be23f87f5ed2d..3fff57ea35257 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -24,6 +24,7 @@ import sys import shlex import traceback +from bisect import bisect_right from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread @@ -534,6 +535,7 @@ def func(iterator): return reduce(op, vals, zeroValue) # TODO: aggregate + def sum(self): """ @@ -610,6 +612,60 @@ def sampleVariance(self): """ return self.stats().sampleVariance() + def getBuckets(self, bucketCount): + """ + Compute a histogram of the data using bucketCount number of buckets + evenly spaced between the min and max of the RDD. + + >>> sc.parallelize([1,49, 23, 100, 75, 50]).histogram() + {(0,49):3, (50, 100):3} + """ + + #use the statscounter as a quick way of getting max and min + mm_stats = self.stats() + min = mm_stats.min() + max = mm_stats.max() + + increment = (max-min)/bucketCount + buckets = range(min,min) + if increment != 0: + buckets = range(min,max, increment) + + return buckets + + def histogram(self, bucketCount, buckets=None): + evenBuckets = False + if not buckets: + buckets = self.getBuckets(bucketCount) + if len(buckets) < 2: + raise ValueError("requires more than 1 bucket") + if len(buckets) % 2 == 0: + evenBuckets = True + # histogram partition + def histogramPartition(iterator): + counters = defaultdict(int) + for obj in iterator: + k = bisect_right(buckets, obj) + if k < len(buckets) and k > 0: + key = (buckets[k-1], buckets[k]-1) + elif k == len(buckets): + key = (buckets[k-1], float("inf")) + elif k == 0: + key = (float("-inf"), buckets[k]-1) + counters[key] += 1 + yield counters + + # merge counters + def mergeCounters(d1, d2): + for k in d2.keys(): + if k in d1: + d1[k] += d2[k] + return d1 + + #map partitions(histogram_partition(bucketFunction)).reduce(mergeCounters) + return self.mapPartitions(histogramPartition).reduce(mergeCounters) + + def countByValue(self): """ Return the count of each unique value in this RDD as a dictionary of diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 8e1cbd4ad9856..ee75c414dcbf6 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -26,7 +26,9 @@ def __init__(self, values=[]): self.n = 0L # Running count of our values self.mu = 0.0 # Running mean of our values self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) - + self.max_v = float("-inf") + self.min_v = float("inf") + for v in values: self.merge(v) @@ -36,6 +38,11 @@ def merge(self, value): self.n += 1 self.mu += delta / self.n self.m2 += delta * (value - self.mu) + if self.max_v < value: + self.max_v = value + if self.min_v > value: + self.min_v = value + return self # Merge another StatCounter into this one, adding up the internal statistics. @@ -49,7 +56,10 @@ def mergeStats(self, other): if self.n == 0: self.mu = other.mu self.m2 = other.m2 - self.n = other.n + self.n = other.n + self.max_v = other.max_v + self.min_v = other.min_v + elif other.n != 0: delta = other.mu - self.mu if other.n * 10 < self.n: @@ -58,6 +68,9 @@ def mergeStats(self, other): self.mu = other.mu - (delta * self.n) / (self.n + other.n) else: self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n) + + self.max_v = max(self.max_v, other.max_v) + self.min_v = min(self.min_v, other.min_v) self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) self.n += other.n @@ -76,6 +89,12 @@ def mean(self): def sum(self): return self.n * self.mu + def min(self): + return self.min_v + + def max(self): + return self.max_v + # Return the variance of the values. def variance(self): if self.n == 0: From eaf89d957e84d3b926f6c5f3f65acb8764c7ec2f Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Tue, 11 Mar 2014 00:34:30 -0700 Subject: [PATCH 2/6] added correct doctest for histogram --- python/pyspark/rdd.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3fff57ea35257..bef32c1f37dfa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -613,14 +613,6 @@ def sampleVariance(self): return self.stats().sampleVariance() def getBuckets(self, bucketCount): - """ - Compute a histogram of the data using bucketCount number of buckets - evenly spaced between the min and max of the RDD. - - >>> sc.parallelize([1,49, 23, 100, 75, 50]).histogram() - {(0,49):3, (50, 100):3} - """ - #use the statscounter as a quick way of getting max and min mm_stats = self.stats() min = mm_stats.min() @@ -634,6 +626,14 @@ def getBuckets(self, bucketCount): return buckets def histogram(self, bucketCount, buckets=None): + """ + Compute a histogram of the data using bucketCount number of buckets + evenly spaced between the min and max of the RDD. + + >>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3) + defaultdict(, {(67, inf): 2, (1, 33): 6, (34, 66): 2}) + """ + evenBuckets = False if not buckets: buckets = self.getBuckets(bucketCount) From 29981f20c28101a34bb15e939062dc24677c4773 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Tue, 11 Mar 2014 00:57:33 -0700 Subject: [PATCH 3/6] fixed indentation on doctest comment --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index bef32c1f37dfa..ef9d5638c7d60 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -626,7 +626,7 @@ def getBuckets(self, bucketCount): return buckets def histogram(self, bucketCount, buckets=None): - """ + """ Compute a histogram of the data using bucketCount number of buckets evenly spaced between the min and max of the RDD. From 37a7deabb2bfffd7ced1c15d6a079e942b73e7b6 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Tue, 11 Mar 2014 11:44:22 -0700 Subject: [PATCH 4/6] cleaned up boundaries for histogram -- uses real min/max when buckets are derived --- python/pyspark/rdd.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ef9d5638c7d60..9cfc2a8028ce8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -623,7 +623,7 @@ def getBuckets(self, bucketCount): if increment != 0: buckets = range(min,max, increment) - return buckets + return {"min":min, "max":max, "buckets":buckets} def histogram(self, bucketCount, buckets=None): """ @@ -633,10 +633,15 @@ def histogram(self, bucketCount, buckets=None): >>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3) defaultdict(, {(67, inf): 2, (1, 33): 6, (34, 66): 2}) """ - + min = float("-inf") + max = float("inf") evenBuckets = False if not buckets: - buckets = self.getBuckets(bucketCount) + b = self.getBuckets(bucketCount) + buckets = b["buckets"] + min = b["min"] + max = b["max"] + if len(buckets) < 2: raise ValueError("requires more than 1 bucket") if len(buckets) % 2 == 0: @@ -649,9 +654,9 @@ def histogramPartition(iterator): if k < len(buckets) and k > 0: key = (buckets[k-1], buckets[k]-1) elif k == len(buckets): - key = (buckets[k-1], float("inf")) + key = (buckets[k-1], max) elif k == 0: - key = (float("-inf"), buckets[k]-1) + key = (min, buckets[k]-1) counters[key] += 1 yield counters From 1e7056db8a9ce0364e46ad0e269bc1de9f9925f4 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Wed, 12 Mar 2014 10:42:09 -0700 Subject: [PATCH 5/6] added underscore to getBucket --- python/pyspark/rdd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9cfc2a8028ce8..1db49cf10f38b 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -612,7 +612,7 @@ def sampleVariance(self): """ return self.stats().sampleVariance() - def getBuckets(self, bucketCount): + def _getBuckets(self, bucketCount): #use the statscounter as a quick way of getting max and min mm_stats = self.stats() min = mm_stats.min() @@ -637,7 +637,7 @@ def histogram(self, bucketCount, buckets=None): max = float("inf") evenBuckets = False if not buckets: - b = self.getBuckets(bucketCount) + b = self._getBuckets(bucketCount) buckets = b["buckets"] min = b["min"] max = b["max"] From ee07d3358821ded7cf9d622da04b8e2128b3760a Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Fri, 14 Mar 2014 11:13:12 -0700 Subject: [PATCH 6/6] cleaned up test for histogram, now depends on SPARK-1248 --- python/pyspark/rdd.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1db49cf10f38b..739e00c0aa355 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -536,6 +536,23 @@ def func(iterator): # TODO: aggregate + def max(self): + """ + Find the maximum item in this RDD. + + >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() + 43.0 + """ + return self.stats().max() + + def min(self): + """ + Find the maximum item in this RDD. + + >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() + 1.0 + """ + return self.stats().min() def sum(self): """ @@ -631,7 +648,7 @@ def histogram(self, bucketCount, buckets=None): evenly spaced between the min and max of the RDD. >>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3) - defaultdict(, {(67, inf): 2, (1, 33): 6, (34, 66): 2}) + defaultdict(, {(67, 100): 2, (1, 33): 6, (34, 66): 2}) """ min = float("-inf") max = float("inf") @@ -641,7 +658,7 @@ def histogram(self, bucketCount, buckets=None): buckets = b["buckets"] min = b["min"] max = b["max"] - + if len(buckets) < 2: raise ValueError("requires more than 1 bucket") if len(buckets) % 2 == 0: @@ -657,6 +674,7 @@ def histogramPartition(iterator): key = (buckets[k-1], max) elif k == 0: key = (min, buckets[k]-1) + print obj, k, key counters[key] += 1 yield counters