-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathworkerPool.py
More file actions
110 lines (68 loc) · 2.78 KB
/
workerPool.py
File metadata and controls
110 lines (68 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
''' workerPool.py '''
import config
from tile import Tile
from outputFolder import OutputFolder
from Queue import Queue
import threading
import urllib2
class ProgressCounter:
def __init__(self, initialValue=0):
self.value = initialValue
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.value += 1
print "processed %i tiles" %self.value
def getValue(self):
with self.lock:
return self.value
class Worker:
def __init__(self, tileQueue, progressCounter):
self.tileQueue = tileQueue
self.thread = None
self.isTimeToDie = False
self.progressCounter = progressCounter
def doWork(self):
outputFolder = OutputFolder(config.OUTPUT_ROOT_FOLDER)
while True:
tile = self.tileQueue.get()
tile.image = self.download(tile)
outputFolder.write(tile)
self.tileQueue.task_done() # this controls queue.join()
self.progressCounter.increment()
def download(self, tile):
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers = { 'User-Agent' : user_agent }
request = urllib2.Request(tile.getURL(), headers=headers)
response = urllib2.urlopen(request)
image = urllib2.urlopen(request).read()
return image
def start(self):
self.thread = threading.Thread(target=self.doWork)
self.thread.start()
def join(self):
self.thread.join()
class WorkerPool:
def __init__(self, tileQueue, numWorkers=config.NUM_WORKER_THREADS):
self.tileQueue = tileQueue
self.progressCounter = ProgressCounter()
self.workers = []
self.numWorkers = numWorkers
def start(self):
print 'starting worker threads'
for i in range(self.numWorkers):
worker = Worker(self.tileQueue, self.progressCounter)
# Making our threads 'daemons' means that we don't need to join() them before our program
# finishes. To make sure all jobs are completed, we use tileQueue.join() instead.
worker.daemon = True
worker.start()
self.workers.append(worker)
def join(self):
self.tileQueue.join() # wait until all tiles on the queue have been fully processed.
processedJobs = self.progressCounter.getValue()
if processedJobs == 0:
print "There was nothing to download. (Perhaps they were already downloaded?)"
elif processedJobs == 1:
print "1 tile was downloaded"
elif processedJobs > 1:
print "%i tiles were downloaded" %processedJobs