| Trees | Indices | Help |
|
|---|
|
|
1 # Copyright (C) 2005, 2006 Joint Institute for VLBI in Europe
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
12
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16
17
18 # This module provides the AIPSTask class. It adapts the Task class from
19 # the Task module to be able to run classic AIPS tasks:
20 #
21 # >>> imean = AIPSTask('imean')
22 #
23 # The resulting class instance has all associated adverbs as attributes:
24 #
25 # >>> print imean.ind
26 # 0.0
27 # >>> imean.ind = 1
28 # >>> print imean.indisk
29 # 1.0
30 # >>> imean.indi = 2.0
31 # >>> print imean.ind
32 # 2.0
33 #
34 # It also knows the range for these attributes:
35 #
36 # >>> imean.ind = -1
37 # Traceback (most recent call last):
38 # ...
39 # ValueError: value '-1.0' is out of range for attribute 'indisk'
40 # >>> imean.ind = 10.0
41 # Traceback (most recent call last):
42 # ...
43 # ValueError: value '10.0' is out of range for attribute 'indisk'
44 #
45 # >>> imean.inc = 'UVDATA'
46 #
47 # >>> print imean.inclass
48 # UVDATA
49 #
50 # >>> imean.blc[1:] = [128, 128]
51 # >>> print imean.blc
52 # [None, 128.0, 128.0, 0.0, 0.0, 0.0, 0.0, 0.0]
53 #
54 # >>> imean.blc = AIPSList([256, 256])
55 # >>> print imean.blc
56 # [None, 256.0, 256.0, 0.0, 0.0, 0.0, 0.0, 0.0]
57 #
58 # It doesn't hurt to apply AIPSList to a scalar:
59 # >>> AIPSList(1)
60 # 1
61 #
62 # And it works on matrices (lists of lists) too:
63 # >>> AIPSList([[1,2],[3,4],[5,6]])
64 # [None, [None, 1, 2], [None, 3, 4], [None, 5, 6]]
65 #
66 # It should also work for strings:
67 # >>> AIPSList('foobar')
68 # 'foobar'
69 # >>> AIPSList(['foo', 'bar'])
70 # [None, 'foo', 'bar']
71 #
72 # The AIPSTask class implements the copy method:
73 #
74 # >>> imean2 = imean.copy()
75 # >>> print imean2.inclass
76 # UVDATA
77 # >>> imean2.inclass = 'SPLIT'
78 # >>> print imean.inclass
79 # UVDATA
80 #
81 # It also implements the == operator, which checks whether task name and
82 # inputs match:
83 #
84 # >>> imean2 == imean
85 # False
86 # >>> imean2.inclass = 'UVDATA'
87 # >>> imean2 == imean
88 # True
89 #
90 # Make sure we handle multi-dimensional arrays correctly:
91 #
92 # >>> sad = AIPSTask('sad')
93 # >>> sad.dowidth[1][1:] = [2, 2, 2]
94 # >>> sad.dowidth[1]
95 # [None, 2.0, 2.0, 2.0]
96 # >>> sad.dowidth[2]
97 # [None, 1.0, 1.0, 1.0]
98
99
100 # Global AIPS defaults.
101 import AIPS
102
103 # Generic Task implementation.
104 from Task import Task, List
105
106 # Generic Python stuff.
107 import copy, fcntl, glob, os, pickle, pydoc, select, signal, sys
108
109
111
112 """This class implements running AIPS tasks."""
113
114 # Package.
115 _package = 'AIPS'
116
117 # List of adverbs referring to data.
118 _data_adverbs = ['indata', 'outdata',
119 'in2data', 'in3data', 'in4data', 'out2data']
120
121 # List of adverbs referring to disks.
122 _disk_adverbs = ['indisk', 'outdisk',
123 'in2disk', 'in3disk', 'in4disk', 'out2disk']
124
125 # List of adverbs referring to file names.
126 _file_adverbs = ['infile', 'infile2', 'outfile', 'outprint',
127 'ofmfile', 'boxfile', 'oboxfile']
128
129 # List of adverbs referring to channels.
130 _chan_adverbs = ['bchan', 'echan', 'chansel', 'channel']
131
132 # List of adverbs referring to image dimensions.
133 _box_adverbs = ['blc', 'trc', 'tblc', 'ttrc', 'pixxy', 'imsize', 'box',
134 'clbox', 'fldsize', 'pix2xy', 'uvsize']
135
136 # Default version.
137 version = os.environ.get('VERSION', 'NEW')
138
139 # Default user number.
140 userno = -1
141
142 # Default verbosity level.
143 msgkill = 0
144
145 # Default to batch mode.
146 isbatch = 32000
147
148 # This should be set to a file object...
149 log = open("/dev/null",'a')
150
152 Task.__init__(self)
153 self._name = name
154 self._input_list = []
155 self._output_list = []
156 self._message_list = []
157
158 # Optional arguments.
159 if 'version' in kwds:
160 self.version = kwds['version']
161
162 # Update default user number.
163 if self.userno == -1:
164 self.userno = AIPS.userno
165
166 # See if there is a proxy that can hand us the details for
167 # this task.
168 params = None
169 for proxy in AIPS.proxies:
170 try:
171 inst = getattr(proxy, self.__class__.__name__)
172 params = inst.params(name, self.version)
173 except Exception, exception:
174 if AIPS.debuglog:
175 print >>AIPS.debuglog, exception
176 continue
177 break
178 if not params:
179 msg = "%s task '%s' is not available" % (self._package, name)
180 raise RuntimeError, msg
181
182 # The XML-RPC proxy will return the details as a dictionary,
183 # not a class.
184 self._default_dict = params['default_dict']
185 self._input_list = params['input_list']
186 self._output_list = params['output_list']
187 self._min_dict = params['min_dict']
188 self._max_dict = params['max_dict']
189 self._strlen_dict = params['strlen_dict']
190 self._help_string = params['help_string']
191 self._explain_string = params['explain_string']
192 for adverb in self._default_dict:
193 if type(self._default_dict[adverb]) == list:
194 value = self._default_dict[adverb]
195 self._default_dict[adverb] = List(self, adverb, value)
196
197 # Initialize all adverbs to their default values.
198 self.defaults()
199
200 # The maximum value for disk numbers is system-dependent.
201 for name in self._disk_adverbs:
202 if name in self._max_dict:
203 self._max_dict[name] = float(len(AIPS.disks) - 1)
204 pass
205 continue
206
207 # The maximum channel is system-dependent.
208 for name in self._chan_adverbs:
209 if name in self._max_dict:
210 # Assume the default
211 self._max_dict[name] = 16384.0
212 pass
213 continue
214
215 # The maximum image size is system-dependent.
216 for name in self._box_adverbs:
217 if name in self._max_dict:
218 # Assume the default
219 self._max_dict[name] = 32768.0
220 pass
221 continue
222
223 return # __init__
224
226 if self.__class__ != other.__class__:
227 return False
228 if self._name != other._name:
229 return False
230 if self.userno != other.userno:
231 return False
232 for adverb in self._input_list:
233 if self.__dict__[adverb] != other.__dict__[adverb]:
234 return False
235 continue
236 return True
237
239 task = AIPSTask(self._name, version=self.version)
240 task.userno = self.userno
241 for adverb in self._input_list:
242 task.__dict__[adverb] = self.__dict__[adverb]
243 continue
244 return task
245
247 """Set adverbs to their defaults."""
248 for attr in self._default_dict:
249 self.__dict__[attr] = copy.copy(self._default_dict[attr])
250 continue
251 return
252
254 """Display ADVERBS."""
255
256 for adverb in adverbs:
257 if self.__dict__[adverb] == '':
258 print "'%s': ''" % adverb
259 else:
260 value = PythonList(self.__dict__[adverb])
261 print "'%s': %s" % (adverb, value)
262 pass
263 continue
264
265 return
266
268 """Display more help for this task."""
269
270 if self._explain_string:
271 pydoc.pager(self._help_string +
272 64 * '-' + '\n' +
273 self._explain_string)
274 pass
275
276 return
277
282
284 """Display all outputs for this task."""
285 self.__display_adverbs(self._output_list)
286 return
287
289 """ Recursively transform a 'List' into a 'list' """
290
291 if type(value) == List:
292 value = list(value)
293 for i in range(1, len(value)):
294 value[i] = self._retype(value[i])
295 continue
296 pass
297
298 return value
299
301 """Spawn the task."""
302
303 if self.userno == -1:
304 raise RuntimeError, "AIPS user number is not set"
305
306 input_dict = {}
307 for adverb in self._input_list:
308 input_dict[adverb] = self._retype(self.__dict__[adverb])
309
310 # Figure out what proxy to use for running the task, and
311 # translate the related disk numbers.
312 url = None
313 proxy = None
314 for adverb in self._disk_adverbs:
315 if adverb in input_dict:
316 disk = int(input_dict[adverb])
317 if disk == 0:
318 continue
319 if not url and not proxy:
320 url = AIPS.disks[disk].url
321 proxy = AIPS.disks[disk].proxy()
322 proxy.__nonzero__ = lambda: True
323 pass
324 if AIPS.disks[disk].url != url:
325 raise RuntimeError, \
326 "AIPS disks are not on the same machine"
327 input_dict[adverb] = float(AIPS.disks[disk].disk)
328 pass
329 continue
330 if not proxy:
331 raise RuntimeError, \
332 "Unable to determine where to execute task"
333
334 inst = getattr(proxy, self.__class__.__name__)
335 tid = inst.spawn(self._name, self.version, self.userno,
336 self.msgkill, self.isbatch, input_dict)
337
338 self._message_list = []
339 return (proxy, tid)
340
342 """Determine whether the task specified by PROXY and TID has
343 finished."""
344
345 inst = getattr(proxy, self.__class__.__name__)
346 return inst.finished(tid)
347
349 """Return messages for the task specified by PROXY and TID."""
350
351 if not proxy and not tid:
352 return self._message_list
353
354 inst = getattr(proxy, self.__class__.__name__)
355 messages = inst.messages(tid)
356 if not messages:
357 return None
358 for message in messages:
359 self._message_list.append(message[1])
360 if message[0] > abs(self.msgkill):
361 print message[1]
362 pass
363 continue
364 return [message[1] for message in messages]
365
367 """Feed the task specified by PROXY and TID with BANANA."""
368
369 inst = getattr(proxy, self.__class__.__name__)
370 return inst.feed(tid, banana)
371
373 """Wait for the task specified by PROXY and TID to finish."""
374
375 while not self.finished(proxy, tid):
376 self.messages(proxy, tid)
377 inst = getattr(proxy, self.__class__.__name__)
378 output_dict = inst.wait(tid)
379 for adverb in self._output_list:
380 self.__dict__[adverb] = output_dict[adverb]
381 continue
382 return
383
385 """Abort the task specified by PROXY and TID."""
386
387 inst = getattr(proxy, self.__class__.__name__)
388 return inst.abort(tid, sig)
389
391 """Run the task."""
392
393 (proxy, tid) = self.spawn()
394 loglist = []
395 count = 0
396 rotator = ['|\b', '/\b', '-\b', '\\\b']
397 try:
398 try:
399 while not self.finished(proxy, tid):
400 messages = self.messages(proxy, tid)
401 if messages:
402 loglist.extend(messages)
403 elif sys.stdout.isatty() and len(rotator) > 0:
404 sys.stdout.write(rotator[count % len(rotator)])
405 sys.stdout.flush()
406 pass
407 events = select.select([sys.stdin.fileno()], [], [], 0)
408 if sys.stdin.fileno() in events[0]:
409 flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
410 flags |= os.O_NONBLOCK
411 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
412 message = sys.stdin.read(1024)
413 flags &= ~os.O_NONBLOCK
414 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
415 self.feed(proxy, tid, message)
416 rotator = []
417 pass
418 count += 1
419 continue
420 pass
421 except KeyboardInterrupt, exception:
422 self.abort(proxy, tid)
423 raise exception
424
425 self.wait(proxy, tid)
426 finally:
427 if self.log:
428 for message in loglist:
429 self.log.write('%s\n' % message)
430 continue
431 self.log.flush()
432 pass
433 else : # use AIPS.log
434 if AIPS.log:
435 for message in loglist:
436 AIPS.log.write('%s\n' % message)
437 continue
438 AIPS.log.flush()
439 pass
440 pass
441 return
442
444 return self.go()
445
447 if name in self._data_adverbs:
448 class _AIPSData: pass
449 value = _AIPSData()
450 prefix = name.replace('data', '')
451 value.name = Task.__getattr__(self, prefix + 'name')
452 value.klass = Task.__getattr__(self, prefix + 'class')
453 value.disk = Task.__getattr__(self, prefix + 'disk')
454 value.seq = Task.__getattr__(self, prefix + 'seq')
455 return value
456 return Task.__getattr__(self, name)
457
459 if name in self._data_adverbs:
460 prefix = name.replace('data', '')
461 Task.__setattr__(self, prefix + 'name', value.name)
462 Task.__setattr__(self, prefix + 'class', value.klass)
463 Task.__setattr__(self, prefix + 'disk', value.disk)
464 Task.__setattr__(self, prefix + 'seq', value.seq)
465 else:
466 # We treat 'infile', 'outfile' and 'outprint' special.
467 # Instead of checking the length of the complete string,
468 # we only check the length of the final component of the
469 # pathname. The backend will split of the direcrory
470 # component and use that as an "area".
471 attr = self._findattr(name)
472 if attr in self._file_adverbs and type(value) == str and \
473 os.path.dirname(value):
474 if len(os.path.basename(value)) > self._strlen_dict[attr] - 2:
475 msg = "string '%s' is too long for attribute '%s'" \
476 % (value, attr)
477 raise ValueError, msg
478 self.__dict__[attr] = value
479 else:
480 Task.__setattr__(self, name, value)
481 pass
482 pass
483 return
484
485 pass # class AIPSTask
486
487
489
490 # Default user number.
491 userno = -1
492
494 # Update default user number.
495 if self.userno == -1:
496 self.userno = AIPS.userno
497 return
498
500 """Zap message log."""
501
502 proxy = AIPS.disks[1].proxy()
503 inst = getattr(proxy, self.__class__.__name__)
504 return inst.zap(self.userno)
505
506 pass # class AIPSMessageLog
507
508
510 """Transform a Python array into an AIPS array.
511
512 Returns a list suitable for using 1-based indices.
513 """
514
515 try:
516 # Make sure we don't consider strings to be lists.
517 if str(list) == list:
518 return list
519 pass
520 except:
521 pass
522
523 try:
524 # Insert 'None' at index zero, and transform LIST's elements.
525 _list = [None]
526 for l in list:
527 _list.append(AIPSList(l))
528 continue
529 return _list
530 except:
531 pass
532
533 # Apparently LIST isn't a list; simply return it unchanged.
534 return list
535
536
538 """Transform an AIPS array into a Python array.
539
540 Returns a list suitable for using normal 0-based indices.
541 """
542
543 try:
544 if list[0] != None:
545 return list
546
547 _list = []
548 for l in list[1:]:
549 _list.append(PythonList(l))
550 continue
551 return _list
552 except:
553 pass
554
555 # Apparently LIST isn't a list; simply return it unchanged.
556 return list
557
558
559 # Tests.
560 if __name__ == '__main__':
561 import doctest, sys
562 results = doctest.testmod(sys.modules[__name__])
563 sys.exit(results[0])
564
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Fri Nov 28 17:13:10 2008 | http://epydoc.sourceforge.net |