Module ParallelTask
[hide private]
[frames] | no frames]

Source Code for Module ParallelTask

  1  # Copyright (C) 2005, 2006 Joint Institute for VLBI in Europe 
  2  # 
  3  # This program is free software; you can redistribute it and/or modify 
  4  # it under the terms of the GNU General Public License as published by 
  5  # the Free Software Foundation; either version 2 of the License, or 
  6  # (at your option) any later version. 
  7  # 
  8  # This program is distributed in the hope that it will be useful, 
  9  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 10  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 11  # GNU General Public License for more details. 
 12   
 13  # You should have received a copy of the GNU General Public License 
 14  # along with this program; if not, write to the Free Software 
 15  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 16   
 17  """ 
 18  This module acts as a container for AIPSTask objects, and is responsible for 
 19  farming out jobs in parallel to a cluster whose individual nodes run XML-RPC 
 20  servers. Assumptions inherent in the current implementation: 
 21   
 22          1. All data has been copied to disks visible to the remote AIPS client, 
 23          and are present in the appropriate AIPS disk catalogue. 
 24   
 25          2. An XML-RPC server must already be running on each of the intended 
 26          computational nodes. (duh) 
 27  """ 
 28   
 29  import os, sys 
 30  from Task import Task 
 31  from AIPSTask import * 
 32  import AIPS 
 33   
34 -class ParallelTask :
35 proxy = None 36 tid = None 37 finished = 0
38 - def __init__(self,task) :
39 self.task = task
40
41 - def spawn(popsnum) :
42 pass
43
44 -class ParallelQueue :
45 """ 46 Our container class for ParallelTask objects. Also contains methods for 47 despatching and monitoring tasks. 48 """ 49
50 - def __init__(self):
51 self._tasklist = [] 52 self._current = 0 # index of last task to be despatched
53
54 - def queue(self,task):
55 try: 56 if not isinstance(task,AIPSTask): 57 raise TypeError 58 except TypeError: 59 print 'Argument is not an AIPSTask' 60 return 61 else: 62 self._tasklist.append(ParallelTask(task)) 63 return len(self._tasklist)
64
65 - def finished(self):
66 """Returns True if any queued task has completed""" 67 68 anydone = False 69 for task in self._tasklist : 70 if (task.task.finished(task.proxy,task.tid)) : 71 task.finished = True 72 anydone = True 73 return anydone
74
75 - def go(self):
76 """ 77 Run the remainder of the task queue. 78 """ 79 while self._current < len(self._tasklist) : 80 (proxy,tid) = self._tasklist[self._current].task.spawn() 81 self._tasklist[self._current].proxy = proxy 82 self._tasklist[self._current].tid = tid 83 self._current += 1 84 while len(self._tasklist) > 0 : 85 self.queuewait() 86 return
87
88 - def queuewait(self) :
89 while not self.finished() : 90 for task in self._tasklist : 91 message = task.task.messages(task.proxy,task.tid) 92 if (message) : 93 for note in message : 94 task.task.log.write('%s\n' % note) 95 continue 96 task.task.log.flush() 97 else : 98 j = len(self._tasklist) - 1 99 while (j >= 0) : 100 if (self._tasklist[j].finished == True) : 101 self._tasklist[j].task.wait(self._tasklist[j].proxy,self._tasklist[j].tid) 102 del self._tasklist[j] 103 j = j - 1 104 return
105 106 pass # class ParallelTask
107