-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathMapReduce.py
More file actions
189 lines (142 loc) · 5.92 KB
/
MapReduce.py
File metadata and controls
189 lines (142 loc) · 5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""
Created on Sun Jan 26 16:51:40 2020
@author: Neeraj
Description: An impelementation of mapper and reducer functions with a few
examples in Python.
Reference: Chapter 25 MapReduce
"""
from typing import Iterable, Iterator, Tuple, List
def tokenize(document: str) -> List[str]:
"""Just split on whitespace"""
return document.split()
def wc_mapper(document: str) -> Iterator[Tuple[str, int]]:
"""For each word in a document, emit (word, 1)"""
for word in tokenize(document):
yield (word, 1)
def wc_reducer(word: str,
counts: Iterable[int]) -> Iterator[Tuple[str, int]]:
"""Sum up the counts for a word"""
yield (word, sum(counts))
from collections import defaultdict
def word_count(documents: List[str]) -> List[Tuple[str, int]]:
"""Count the words in the input documents using MapReduce"""
collector = defaultdict(list) # to store grouped values
for document in documents:
for word, count in wc_mapper(document):
collector[word].append(count)
return [output
for word, counts in collector.items()
for output in wc_reducer(word, counts)]
# Writing a general map_reduce function
from typing import Callable, Iterable, Any, Tuple
# A key/value pair is just a 2-tuple
KV = Tuple[Any, Any]
# A Mapper is a function that returns an Iterable of key/value pairs
Mapper = Callable[..., Iterable[KV]]
# A Reducer is a function that takes a key and an iterable of values
# and returns a key/value pair
Reducer = Callable[[Any, Iterable], KV]
def map_reduce(inputs: Iterable,
mapper: Mapper,
reducer: Reducer) -> List[KV]:
"""Run MapReduce on the inputs using mapper and reducer"""
collector = defaultdict(list)
for input in inputs:
for key, value in mapper(input):
collector[key].append(value)
return [output
for key, values in collector.items()
for output in reducer(key, values)]
def values_reducer(values_fn: Callable) -> Reducer:
"""Return a reducer that just applies values_fn to its values"""
def reduce(key, values: Iterable) -> KV:
return (key, values_fn(values))
return reduce
sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))
status_updates = [
{"id": 2,
"username" : "neerajkumar",
"text" : "Should I use the second edition of Joel's data science book?",
"created_at" : datetime.datetime(2018, 2, 21, 11, 47, 0),
"liked_by" : ["data_guy", "data_gal", "mike"] },
# ...
]
def data_science_day_mapper(status_update: dict) -> Iterable:
"""Yields (day_of_week, 1) of status_update contains 'data science'"""
if "data science" in status_update["text"].lower():
day_of_week = status_update["created_at"].weekday()
yield (day_of_week, 1)
data_science_days = map_reduce(status_updates, data_science_day_mapper,
sum_reducer)
from collections import Counter
def words_per_user_mapper(status_update: dict):
user = status_update["username"]
for word in tokenize(status_update["text"]):
yield (user, (word, 1))
def most_popular_word_reducer(user: str,
words_and_counts: Iterable[KV]):
"""Given a sequence of (word, count) pairs,
return the word with the highest total count"""
word_counts = Counter()
for word, count in words_and_counts:
word_counts[word] += count
word, count = word_counts.most_common(1)[0]
yield (user, (word, count))
user_words = map_reduce(status_updates,
words_per_user_mapper,
most_popular_word_reducer)
def liker_mapper(status_update: dict):
user = status_update["username"]
for liker in status_update["liked_by"]:
yield (user, liker)
distinct_likers_per_user = map_reduce(status_updates,
liker_mapper,
count_distinct_reducer)
from typing import NamedTuple
class Entry(NamedTuple):
name: str
i: int
j: int
value: float
def matrix_multiply_mapper(num_rows_a: int, num_cols_b: int) -> Mapper:
# C[x][y] = A[x][0] * B[0][y] + ... + A[x][m] * B[m][y]
#
# so an element A[i][j] goes into every C[i][y] with coef B[j][y]
# and an element B[i][j] goes into every C[x][j] with coef A[x][i]
def mapper(entry: Entry):
if entry.name == "A":
for y in range(num_cols_b):
key = (entry.i, y) # which element of C
value = (entry.j, entry.value) # which entry in the sum
yield (key, value)
else:
for x in range(num_rows_a):
key = (x, entry.j) # which element of C
value = (entry.i, entry.value) # which entry in the sum
yield (key, value)
return mapper
def matrix_multiply_reducer(key: Tuple[int, int],
indexed_values: Iterable[Tuple[int, int]]):
results_by_index = defaultdict(list)
for index, value in indexed_values:
results_by_index[index].append(value)
# Multiply the values for positions with two values
# (one from A, and one from B) and sum them up.
sumproduct = sum(values[0] * values[1]
for values in results_by_index.values()
if len(values) == 2)
if sumproduct != 0.0:
yield (key, sumproduct)
A = [[3, 2, 0],
[0, 0, 0]]
B = [[4, -1, 0],
[10, 0, 0],
[0, 0, 0]]
entries = [Entry("A", 0, 0, 3), Entry("A", 0, 1, 2), Entry("B", 0, 0, 4),
Entry("B", 0, 1, -1), Entry("B", 1, 0, 10)]
mapper = matrix_multiply_mapper(num_rows_a=2, num_cols_b=3)
reducer = matrix_multiply_reducer
set(map_reduce(entries, mapper, reducer))