1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 This module acts as a container for AIPSTask objects, and is responsible for
19 farming out jobs in parallel to a cluster whose individual nodes run XML-RPC
20 servers. Assumptions inherent in the current implementation:
21
22 1. All data has been copied to disks visible to the remote AIPS client,
23 and are present in the appropriate AIPS disk catalogue.
24
25 2. An XML-RPC server must already be running on each of the intended
26 computational nodes. (duh)
27 """
28
29 import os, sys
30 from Task import Task
31 from AIPSTask import *
32 import AIPS
33
43
45 """
46 Our container class for ParallelTask objects. Also contains methods for
47 despatching and monitoring tasks.
48 """
49
51 self._tasklist = []
52 self._current = 0
53
55 try:
56 if not isinstance(task,AIPSTask):
57 raise TypeError
58 except TypeError:
59 print 'Argument is not an AIPSTask'
60 return
61 else:
62 self._tasklist.append(ParallelTask(task))
63 return len(self._tasklist)
64
66 """Returns True if any queued task has completed"""
67
68 anydone = False
69 for task in self._tasklist :
70 if (task.task.finished(task.proxy,task.tid)) :
71 task.finished = True
72 anydone = True
73 return anydone
74
76 """
77 Run the remainder of the task queue.
78 """
79 while self._current < len(self._tasklist) :
80 (proxy,tid) = self._tasklist[self._current].task.spawn()
81 self._tasklist[self._current].proxy = proxy
82 self._tasklist[self._current].tid = tid
83 self._current += 1
84 while len(self._tasklist) > 0 :
85 self.queuewait()
86 return
87
89 while not self.finished() :
90 for task in self._tasklist :
91 message = task.task.messages(task.proxy,task.tid)
92 if (message) :
93 for note in message :
94 task.task.log.write('%s\n' % note)
95 continue
96 task.task.log.flush()
97 else :
98 j = len(self._tasklist) - 1
99 while (j >= 0) :
100 if (self._tasklist[j].finished == True) :
101 self._tasklist[j].task.wait(self._tasklist[j].proxy,self._tasklist[j].tid)
102 del self._tasklist[j]
103 j = j - 1
104 return
105
106 pass
107