Module AIPSTask
[hide private]
[frames] | no frames]

Source Code for Module AIPSTask

  1  # Copyright (C) 2005, 2006 Joint Institute for VLBI in Europe 
  2  # 
  3  # This program is free software; you can redistribute it and/or modify 
  4  # it under the terms of the GNU General Public License as published by 
  5  # the Free Software Foundation; either version 2 of the License, or 
  6  # (at your option) any later version. 
  7  # 
  8  # This program is distributed in the hope that it will be useful, 
  9  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 10  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 11  # GNU General Public License for more details. 
 12   
 13  # You should have received a copy of the GNU General Public License 
 14  # along with this program; if not, write to the Free Software 
 15  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 16   
 17   
 18  # This module provides the AIPSTask class.  It adapts the Task class from 
 19  # the Task module to be able to run classic AIPS tasks: 
 20  #  
 21  # >>> imean = AIPSTask('imean') 
 22  #  
 23  # The resulting class instance has all associated adverbs as attributes: 
 24  #  
 25  # >>> print imean.ind 
 26  # 0.0 
 27  # >>> imean.ind = 1 
 28  # >>> print imean.indisk 
 29  # 1.0 
 30  # >>> imean.indi = 2.0 
 31  # >>> print imean.ind 
 32  # 2.0 
 33  #  
 34  # It also knows the range for these attributes: 
 35  #  
 36  # >>> imean.ind = -1 
 37  # Traceback (most recent call last): 
 38  #   ... 
 39  # ValueError: value '-1.0' is out of range for attribute 'indisk' 
 40  # >>> imean.ind = 10.0 
 41  # Traceback (most recent call last): 
 42  #   ... 
 43  # ValueError: value '10.0' is out of range for attribute 'indisk' 
 44  #  
 45  # >>> imean.inc = 'UVDATA' 
 46  #  
 47  # >>> print imean.inclass 
 48  # UVDATA 
 49  #  
 50  # >>> imean.blc[1:] = [128, 128] 
 51  # >>> print imean.blc 
 52  # [None, 128.0, 128.0, 0.0, 0.0, 0.0, 0.0, 0.0] 
 53  #  
 54  # >>> imean.blc = AIPSList([256, 256]) 
 55  # >>> print imean.blc 
 56  # [None, 256.0, 256.0, 0.0, 0.0, 0.0, 0.0, 0.0] 
 57  #  
 58  # It doesn't hurt to apply AIPSList to a scalar: 
 59  # >>> AIPSList(1) 
 60  # 1 
 61  #  
 62  # And it works on matrices (lists of lists) too: 
 63  # >>> AIPSList([[1,2],[3,4],[5,6]]) 
 64  # [None, [None, 1, 2], [None, 3, 4], [None, 5, 6]] 
 65  #  
 66  # It should also work for strings: 
 67  # >>> AIPSList('foobar') 
 68  # 'foobar' 
 69  # >>> AIPSList(['foo', 'bar']) 
 70  # [None, 'foo', 'bar'] 
 71  #  
 72  # The AIPSTask class implements the copy method: 
 73  #  
 74  # >>> imean2 = imean.copy() 
 75  # >>> print imean2.inclass 
 76  # UVDATA 
 77  # >>> imean2.inclass = 'SPLIT' 
 78  # >>> print imean.inclass 
 79  # UVDATA 
 80  #  
 81  # It also implements the == operator, which checks whether task name and 
 82  # inputs match: 
 83  #  
 84  # >>> imean2 == imean 
 85  # False 
 86  # >>> imean2.inclass = 'UVDATA' 
 87  # >>> imean2 == imean 
 88  # True 
 89  #  
 90  # Make sure we handle multi-dimensional arrays correctly: 
 91  #  
 92  # >>> sad = AIPSTask('sad') 
 93  # >>> sad.dowidth[1][1:] = [2, 2, 2] 
 94  # >>> sad.dowidth[1] 
 95  # [None, 2.0, 2.0, 2.0] 
 96  # >>> sad.dowidth[2] 
 97  # [None, 1.0, 1.0, 1.0] 
 98   
 99   
100  # Global AIPS defaults. 
101  import AIPS 
102   
103  # Generic Task implementation. 
104  from Task import Task, List 
105   
106  # Generic Python stuff. 
107  import copy, fcntl, glob, os, pickle, pydoc, select, signal, sys 
108   
109   
110 -class AIPSTask(Task):
111 112 """This class implements running AIPS tasks.""" 113 114 # Package. 115 _package = 'AIPS' 116 117 # List of adverbs referring to data. 118 _data_adverbs = ['indata', 'outdata', 119 'in2data', 'in3data', 'in4data', 'out2data'] 120 121 # List of adverbs referring to disks. 122 _disk_adverbs = ['indisk', 'outdisk', 123 'in2disk', 'in3disk', 'in4disk', 'out2disk'] 124 125 # List of adverbs referring to file names. 126 _file_adverbs = ['infile', 'infile2', 'outfile', 'outprint', 127 'ofmfile', 'boxfile', 'oboxfile'] 128 129 # List of adverbs referring to channels. 130 _chan_adverbs = ['bchan', 'echan', 'chansel', 'channel'] 131 132 # List of adverbs referring to image dimensions. 133 _box_adverbs = ['blc', 'trc', 'tblc', 'ttrc', 'pixxy', 'imsize', 'box', 134 'clbox', 'fldsize', 'pix2xy', 'uvsize'] 135 136 # Default version. 137 version = os.environ.get('VERSION', 'NEW') 138 139 # Default user number. 140 userno = -1 141 142 # Default verbosity level. 143 msgkill = 0 144 145 # Default to batch mode. 146 isbatch = 32000 147 148 # This should be set to a file object... 149 log = open("/dev/null",'a') 150
151 - def __init__(self, name, **kwds):
152 Task.__init__(self) 153 self._name = name 154 self._input_list = [] 155 self._output_list = [] 156 self._message_list = [] 157 158 # Optional arguments. 159 if 'version' in kwds: 160 self.version = kwds['version'] 161 162 # Update default user number. 163 if self.userno == -1: 164 self.userno = AIPS.userno 165 166 # See if there is a proxy that can hand us the details for 167 # this task. 168 params = None 169 for proxy in AIPS.proxies: 170 try: 171 inst = getattr(proxy, self.__class__.__name__) 172 params = inst.params(name, self.version) 173 except Exception, exception: 174 if AIPS.debuglog: 175 print >>AIPS.debuglog, exception 176 continue 177 break 178 if not params: 179 msg = "%s task '%s' is not available" % (self._package, name) 180 raise RuntimeError, msg 181 182 # The XML-RPC proxy will return the details as a dictionary, 183 # not a class. 184 self._default_dict = params['default_dict'] 185 self._input_list = params['input_list'] 186 self._output_list = params['output_list'] 187 self._min_dict = params['min_dict'] 188 self._max_dict = params['max_dict'] 189 self._strlen_dict = params['strlen_dict'] 190 self._help_string = params['help_string'] 191 self._explain_string = params['explain_string'] 192 for adverb in self._default_dict: 193 if type(self._default_dict[adverb]) == list: 194 value = self._default_dict[adverb] 195 self._default_dict[adverb] = List(self, adverb, value) 196 197 # Initialize all adverbs to their default values. 198 self.defaults() 199 200 # The maximum value for disk numbers is system-dependent. 201 for name in self._disk_adverbs: 202 if name in self._max_dict: 203 self._max_dict[name] = float(len(AIPS.disks) - 1) 204 pass 205 continue 206 207 # The maximum channel is system-dependent. 208 for name in self._chan_adverbs: 209 if name in self._max_dict: 210 # Assume the default 211 self._max_dict[name] = 16384.0 212 pass 213 continue 214 215 # The maximum image size is system-dependent. 216 for name in self._box_adverbs: 217 if name in self._max_dict: 218 # Assume the default 219 self._max_dict[name] = 32768.0 220 pass 221 continue 222 223 return # __init__
224
225 - def __eq__(self, other):
226 if self.__class__ != other.__class__: 227 return False 228 if self._name != other._name: 229 return False 230 if self.userno != other.userno: 231 return False 232 for adverb in self._input_list: 233 if self.__dict__[adverb] != other.__dict__[adverb]: 234 return False 235 continue 236 return True
237
238 - def copy(self):
239 task = AIPSTask(self._name, version=self.version) 240 task.userno = self.userno 241 for adverb in self._input_list: 242 task.__dict__[adverb] = self.__dict__[adverb] 243 continue 244 return task
245
246 - def defaults(self):
247 """Set adverbs to their defaults.""" 248 for attr in self._default_dict: 249 self.__dict__[attr] = copy.copy(self._default_dict[attr]) 250 continue 251 return
252
253 - def __display_adverbs(self, adverbs):
254 """Display ADVERBS.""" 255 256 for adverb in adverbs: 257 if self.__dict__[adverb] == '': 258 print "'%s': ''" % adverb 259 else: 260 value = PythonList(self.__dict__[adverb]) 261 print "'%s': %s" % (adverb, value) 262 pass 263 continue 264 265 return
266
267 - def explain(self):
268 """Display more help for this task.""" 269 270 if self._explain_string: 271 pydoc.pager(self._help_string + 272 64 * '-' + '\n' + 273 self._explain_string) 274 pass 275 276 return
277
278 - def inputs(self):
279 """Display all inputs for this task.""" 280 self.__display_adverbs(self._input_list) 281 return
282
283 - def outputs(self):
284 """Display all outputs for this task.""" 285 self.__display_adverbs(self._output_list) 286 return
287
288 - def _retype(self, value):
289 """ Recursively transform a 'List' into a 'list' """ 290 291 if type(value) == List: 292 value = list(value) 293 for i in range(1, len(value)): 294 value[i] = self._retype(value[i]) 295 continue 296 pass 297 298 return value
299
300 - def spawn(self):
301 """Spawn the task.""" 302 303 if self.userno == -1: 304 raise RuntimeError, "AIPS user number is not set" 305 306 input_dict = {} 307 for adverb in self._input_list: 308 input_dict[adverb] = self._retype(self.__dict__[adverb]) 309 310 # Figure out what proxy to use for running the task, and 311 # translate the related disk numbers. 312 url = None 313 proxy = None 314 for adverb in self._disk_adverbs: 315 if adverb in input_dict: 316 disk = int(input_dict[adverb]) 317 if disk == 0: 318 continue 319 if not url and not proxy: 320 url = AIPS.disks[disk].url 321 proxy = AIPS.disks[disk].proxy() 322 proxy.__nonzero__ = lambda: True 323 pass 324 if AIPS.disks[disk].url != url: 325 raise RuntimeError, \ 326 "AIPS disks are not on the same machine" 327 input_dict[adverb] = float(AIPS.disks[disk].disk) 328 pass 329 continue 330 if not proxy: 331 raise RuntimeError, \ 332 "Unable to determine where to execute task" 333 334 inst = getattr(proxy, self.__class__.__name__) 335 tid = inst.spawn(self._name, self.version, self.userno, 336 self.msgkill, self.isbatch, input_dict) 337 338 self._message_list = [] 339 return (proxy, tid)
340
341 - def finished(self, proxy, tid):
342 """Determine whether the task specified by PROXY and TID has 343 finished.""" 344 345 inst = getattr(proxy, self.__class__.__name__) 346 return inst.finished(tid)
347
348 - def messages(self, proxy=None, tid=None):
349 """Return messages for the task specified by PROXY and TID.""" 350 351 if not proxy and not tid: 352 return self._message_list 353 354 inst = getattr(proxy, self.__class__.__name__) 355 messages = inst.messages(tid) 356 if not messages: 357 return None 358 for message in messages: 359 self._message_list.append(message[1]) 360 if message[0] > abs(self.msgkill): 361 print message[1] 362 pass 363 continue 364 return [message[1] for message in messages]
365
366 - def feed(self, proxy, tid, banana):
367 """Feed the task specified by PROXY and TID with BANANA.""" 368 369 inst = getattr(proxy, self.__class__.__name__) 370 return inst.feed(tid, banana)
371
372 - def wait(self, proxy, tid):
373 """Wait for the task specified by PROXY and TID to finish.""" 374 375 while not self.finished(proxy, tid): 376 self.messages(proxy, tid) 377 inst = getattr(proxy, self.__class__.__name__) 378 output_dict = inst.wait(tid) 379 for adverb in self._output_list: 380 self.__dict__[adverb] = output_dict[adverb] 381 continue 382 return
383
384 - def abort(self, proxy, tid, sig=signal.SIGTERM):
385 """Abort the task specified by PROXY and TID.""" 386 387 inst = getattr(proxy, self.__class__.__name__) 388 return inst.abort(tid, sig)
389
390 - def go(self):
391 """Run the task.""" 392 393 (proxy, tid) = self.spawn() 394 loglist = [] 395 count = 0 396 rotator = ['|\b', '/\b', '-\b', '\\\b'] 397 try: 398 try: 399 while not self.finished(proxy, tid): 400 messages = self.messages(proxy, tid) 401 if messages: 402 loglist.extend(messages) 403 elif sys.stdout.isatty() and len(rotator) > 0: 404 sys.stdout.write(rotator[count % len(rotator)]) 405 sys.stdout.flush() 406 pass 407 events = select.select([sys.stdin.fileno()], [], [], 0) 408 if sys.stdin.fileno() in events[0]: 409 flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL) 410 flags |= os.O_NONBLOCK 411 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags) 412 message = sys.stdin.read(1024) 413 flags &= ~os.O_NONBLOCK 414 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags) 415 self.feed(proxy, tid, message) 416 rotator = [] 417 pass 418 count += 1 419 continue 420 pass 421 except KeyboardInterrupt, exception: 422 self.abort(proxy, tid) 423 raise exception 424 425 self.wait(proxy, tid) 426 finally: 427 if self.log: 428 for message in loglist: 429 self.log.write('%s\n' % message) 430 continue 431 self.log.flush() 432 pass 433 else : # use AIPS.log 434 if AIPS.log: 435 for message in loglist: 436 AIPS.log.write('%s\n' % message) 437 continue 438 AIPS.log.flush() 439 pass 440 pass 441 return
442
443 - def __call__(self):
444 return self.go()
445
446 - def __getattr__(self, name):
447 if name in self._data_adverbs: 448 class _AIPSData: pass 449 value = _AIPSData() 450 prefix = name.replace('data', '') 451 value.name = Task.__getattr__(self, prefix + 'name') 452 value.klass = Task.__getattr__(self, prefix + 'class') 453 value.disk = Task.__getattr__(self, prefix + 'disk') 454 value.seq = Task.__getattr__(self, prefix + 'seq') 455 return value 456 return Task.__getattr__(self, name)
457
458 - def __setattr__(self, name, value):
459 if name in self._data_adverbs: 460 prefix = name.replace('data', '') 461 Task.__setattr__(self, prefix + 'name', value.name) 462 Task.__setattr__(self, prefix + 'class', value.klass) 463 Task.__setattr__(self, prefix + 'disk', value.disk) 464 Task.__setattr__(self, prefix + 'seq', value.seq) 465 else: 466 # We treat 'infile', 'outfile' and 'outprint' special. 467 # Instead of checking the length of the complete string, 468 # we only check the length of the final component of the 469 # pathname. The backend will split of the direcrory 470 # component and use that as an "area". 471 attr = self._findattr(name) 472 if attr in self._file_adverbs and type(value) == str and \ 473 os.path.dirname(value): 474 if len(os.path.basename(value)) > self._strlen_dict[attr] - 2: 475 msg = "string '%s' is too long for attribute '%s'" \ 476 % (value, attr) 477 raise ValueError, msg 478 self.__dict__[attr] = value 479 else: 480 Task.__setattr__(self, name, value) 481 pass 482 pass 483 return
484 485 pass # class AIPSTask
486 487
488 -class AIPSMessageLog:
489 490 # Default user number. 491 userno = -1 492
493 - def __init__(self):
494 # Update default user number. 495 if self.userno == -1: 496 self.userno = AIPS.userno 497 return
498
499 - def zap(self):
500 """Zap message log.""" 501 502 proxy = AIPS.disks[1].proxy() 503 inst = getattr(proxy, self.__class__.__name__) 504 return inst.zap(self.userno)
505 506 pass # class AIPSMessageLog
507 508
509 -def AIPSList(list):
510 """Transform a Python array into an AIPS array. 511 512 Returns a list suitable for using 1-based indices. 513 """ 514 515 try: 516 # Make sure we don't consider strings to be lists. 517 if str(list) == list: 518 return list 519 pass 520 except: 521 pass 522 523 try: 524 # Insert 'None' at index zero, and transform LIST's elements. 525 _list = [None] 526 for l in list: 527 _list.append(AIPSList(l)) 528 continue 529 return _list 530 except: 531 pass 532 533 # Apparently LIST isn't a list; simply return it unchanged. 534 return list
535 536
537 -def PythonList(list):
538 """Transform an AIPS array into a Python array. 539 540 Returns a list suitable for using normal 0-based indices. 541 """ 542 543 try: 544 if list[0] != None: 545 return list 546 547 _list = [] 548 for l in list[1:]: 549 _list.append(PythonList(l)) 550 continue 551 return _list 552 except: 553 pass 554 555 # Apparently LIST isn't a list; simply return it unchanged. 556 return list
557 558 559 # Tests. 560 if __name__ == '__main__': 561 import doctest, sys 562 results = doctest.testmod(sys.modules[__name__]) 563 sys.exit(results[0]) 564