-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtask_manager
More file actions
137 lines (124 loc) · 4.67 KB
/
task_manager
File metadata and controls
137 lines (124 loc) · 4.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!python
#
# Send inquiries to an active task manager
# (see Scientific.DistributedComputing)
# Written by Konrad Hinsen <hinsen@cnrs-orleans.fr>
# last revision: 2010-12-18
#
from Scientific.DistributedComputing import TaskManager
import Pyro.core
import Pyro.naming
import Pyro.errors
from optparse import OptionParser
import os, sys, time
Pyro.core.initClient(banner=False)
commands = {}
arg_count = {}
def getTaskManager(label):
if options.master is None:
if options.wait:
while True:
try:
return Pyro.core.getProxyForURI("PYRONAME://TaskManager.%s"
% label)
except Pyro.errors.NamingError:
time.sleep(1)
else:
try:
return Pyro.core.getProxyForURI("PYRONAME://TaskManager.%s"
% label)
except Pyro.errors.NamingError:
print "No name server or no task manager with label %s" % label
raise SystemExit
else:
if options.wait:
while True:
try:
uri = "PYROLOC://%s/TaskManager.%s" % (options.master, label)
return Pyro.core.getProxyForURI(uri)
except Pyro.errors.ProtocolError:
time.sleep(1)
except Pyro.errors.URIError:
print "No host %s" % options.master
raise SystemExit
else:
try:
uri = "PYROLOC://%s/TaskManager.%s" % (options.master, label)
return Pyro.core.getProxyForURI(uri)
except Pyro.errors.URIError:
print "No host %s" % options.master
raise SystemExit
except Pyro.errors.ProtocolError:
print "No task manager with label %s" % label
raise SystemExit
def showInfo(label):
task_manager = getTaskManager(label)
active_processes = task_manager.numberOfActiveProcesses()
waiting, running, finished = task_manager.numberOfTasks()
print "%d active processe(s)" % active_processes
if options.verbose:
for i in range(active_processes):
print " %d: %s" % (i, task_manager.activeProcessInfo(i))
print "%d waiting tasks" % sum(waiting.values())
if options.verbose:
for tag, count in waiting.items():
print " %s: %d" % (tag, count)
print "%d running tasks" % sum(running.values())
if options.verbose:
for tag, count in running.items():
print " %s: %d" % (tag, count)
print "%d results waiting to be retrieved" % sum(finished.values())
if options.verbose:
for tag, count in finished.items():
print " %s: %d" % (tag, count)
commands["show"] = showInfo
arg_count["show"] = 2
def listTaskManagers():
if options.master is not None:
print "Command 'list' requires a name server"
raise SystemExit
pyro_ns = Pyro.naming.NameServerLocator().getNS()
for label, type in pyro_ns.list("TaskManager"):
if type == 1:
print label
if options.verbose:
task_manager = getTaskManager(label)
active_processes = task_manager.numberOfActiveProcesses()
print " %d active processe(s)" % active_processes
commands["list"] = listTaskManagers
arg_count["list"] = 1
def runSlave(label):
task_manager = getTaskManager(label)
try:
slave_code = task_manager.retrieveData("slave_code")
directory = task_manager.retrieveData("cwd")
except KeyError:
print "No slave code available for %s" % label
raise SystemExit
namespace = {}
sys.modules["__main__"].SLAVE_PROCESS_LABEL = label
sys.modules["__main__"].SLAVE_NAMESPACE = namespace
os.chdir(directory)
exec slave_code in namespace
commands["slave"] = runSlave
arg_count["slave"] = 2
# Parse command line and execute command
parser = OptionParser(usage="Usage: %prog [options] command [label]")
parser.add_option("-m", "--master",
help="hostname of the machine running the master process")
parser.add_option("-v", "--verbose",
action="store_true",
help="verbose output")
parser.add_option("-w", "--wait",
action="store_true",
help="wait for master to be launched")
options, args = parser.parse_args()
if len(args) < 1:
parser.error("incorrect number of arguments")
try:
command = commands[args[0]]
except KeyError:
parser.error("unknown command %s" % args[0])
if len(args) != arg_count[args[0]]:
parser.error("incorrect number of arguments")
command(*args[1:])