-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.py
More file actions
97 lines (75 loc) · 3 KB
/
executor.py
File metadata and controls
97 lines (75 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import heapq
import random
import threading
import time
def fmt_time(t_s):
return time.strftime('%Y-%m-%d %H:%M:%S %Z', time.gmtime(t_s))
class Task:
def __init__(self, command, delay, timeunit):
self.command = command
self.delay = delay
self.timeunit = timeunit
class Executor:
def __init__(self):
self.queue = []
heapq.heapify(self.queue)
self.lock = threading.Lock()
self.not_empty = threading.Condition(self.lock)
self.open_threads = []
def schedule(self, command, delay: int, timeunit):
"""Creates and executes a one-shot action that becomes enabled after
the given delay."""
with self.lock:
heapq.heappush(self.queue, (delay, Task(command, delay, timeunit)))
self.not_empty.notify()
def scheduleAtFixedRate(self, command, initialDelay, timeperiod,
timeunit):
"""Creates and executes a periodic action that becomes enabled
first after the given initial delay, and subsequently with the given
period; that is executions will commence after initialDelay then
initialDelay+period, then initialDelay + 2 * period, and so on."""
pass
def scheduleWithFixedDelay(self, command, initialDelay, delay,
timeunit):
"""Creates and executes a periodic action that becomes enabled
first after the given initial delay, and subsequently with the given
delay between the termination of one execution and the commencement of
the next."""
pass
def command_exe(self, task):
print(f"{fmt_time(time.time())} Starting {task.command} - "
f"{task.timeunit}")
time.sleep(task.timeunit)
print(f"{fmt_time(time.time())} Finished {task.command}")
def execute(self, task):
print(f"{fmt_time(time.time())} Picked up {task.command}, sleeping "
f"for {task.delay}")
time.sleep(task.delay)
self.command_exe(task)
def producer(self):
for i in range(1, 7):
command = "task" + str(i)
delay = random.randint(2, 7)
timeunit = random.randint(2, 2)
self.schedule(command, delay, timeunit)
print(f"Added task {command}, {delay}, {timeunit} to the queue")
time.sleep(0.1)
def consumer(self):
while True:
with self.lock:
self.not_empty.wait()
ptask = heapq.heappop(self.queue)
t = threading.Thread(target=self.execute, args=(ptask[1],))
t.start()
def __call__(self, *args, **kwargs):
print(f"{fmt_time(time.time())}")
t2 = threading.Thread(target=self.producer, args=(),
name="Producer_Thread")
t1 = threading.Thread(target=self.consumer, args=(),
name="Consumer_Thread")
t1.start()
t2.start()
t2.join()
t1.join()
exec1 = Executor()
exec1()