1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 import AIPS
102
103
104 from Task import Task, List
105
106
107 import copy, fcntl, glob, os, pickle, pydoc, select, signal, sys
108
109
111
112 """This class implements running AIPS tasks."""
113
114
115 _package = 'AIPS'
116
117
118 _data_adverbs = ['indata', 'outdata',
119 'in2data', 'in3data', 'in4data', 'out2data']
120
121
122 _disk_adverbs = ['indisk', 'outdisk',
123 'in2disk', 'in3disk', 'in4disk', 'out2disk']
124
125
126 _file_adverbs = ['infile', 'infile2', 'outfile', 'outprint',
127 'ofmfile', 'boxfile', 'oboxfile']
128
129
130 _chan_adverbs = ['bchan', 'echan', 'chansel', 'channel']
131
132
133 _box_adverbs = ['blc', 'trc', 'tblc', 'ttrc', 'pixxy', 'imsize', 'box',
134 'clbox', 'fldsize', 'pix2xy', 'uvsize']
135
136
137 version = os.environ.get('VERSION', 'NEW')
138
139
140 userno = -1
141
142
143 msgkill = 0
144
145
146 isbatch = 32000
147
148
149 log = open("/dev/null",'a')
150
152 Task.__init__(self)
153 self._name = name
154 self._input_list = []
155 self._output_list = []
156 self._message_list = []
157
158
159 if 'version' in kwds:
160 self.version = kwds['version']
161
162
163 if self.userno == -1:
164 self.userno = AIPS.userno
165
166
167
168 params = None
169 for proxy in AIPS.proxies:
170 try:
171 inst = getattr(proxy, self.__class__.__name__)
172 params = inst.params(name, self.version)
173 except Exception, exception:
174 if AIPS.debuglog:
175 print >>AIPS.debuglog, exception
176 continue
177 break
178 if not params:
179 msg = "%s task '%s' is not available" % (self._package, name)
180 raise RuntimeError, msg
181
182
183
184 self._default_dict = params['default_dict']
185 self._input_list = params['input_list']
186 self._output_list = params['output_list']
187 self._min_dict = params['min_dict']
188 self._max_dict = params['max_dict']
189 self._strlen_dict = params['strlen_dict']
190 self._help_string = params['help_string']
191 self._explain_string = params['explain_string']
192 for adverb in self._default_dict:
193 if type(self._default_dict[adverb]) == list:
194 value = self._default_dict[adverb]
195 self._default_dict[adverb] = List(self, adverb, value)
196
197
198 self.defaults()
199
200
201 for name in self._disk_adverbs:
202 if name in self._max_dict:
203 self._max_dict[name] = float(len(AIPS.disks) - 1)
204 pass
205 continue
206
207
208 for name in self._chan_adverbs:
209 if name in self._max_dict:
210
211 self._max_dict[name] = 16384.0
212 pass
213 continue
214
215
216 for name in self._box_adverbs:
217 if name in self._max_dict:
218
219 self._max_dict[name] = 32768.0
220 pass
221 continue
222
223 return
224
226 if self.__class__ != other.__class__:
227 return False
228 if self._name != other._name:
229 return False
230 if self.userno != other.userno:
231 return False
232 for adverb in self._input_list:
233 if self.__dict__[adverb] != other.__dict__[adverb]:
234 return False
235 continue
236 return True
237
239 task = AIPSTask(self._name, version=self.version)
240 task.userno = self.userno
241 for adverb in self._input_list:
242 task.__dict__[adverb] = self.__dict__[adverb]
243 continue
244 return task
245
247 """Set adverbs to their defaults."""
248 for attr in self._default_dict:
249 self.__dict__[attr] = copy.copy(self._default_dict[attr])
250 continue
251 return
252
254 """Display ADVERBS."""
255
256 for adverb in adverbs:
257 if self.__dict__[adverb] == '':
258 print "'%s': ''" % adverb
259 else:
260 value = PythonList(self.__dict__[adverb])
261 print "'%s': %s" % (adverb, value)
262 pass
263 continue
264
265 return
266
268 """Display more help for this task."""
269
270 if self._explain_string:
271 pydoc.pager(self._help_string +
272 64 * '-' + '\n' +
273 self._explain_string)
274 pass
275
276 return
277
282
284 """Display all outputs for this task."""
285 self.__display_adverbs(self._output_list)
286 return
287
289 """ Recursively transform a 'List' into a 'list' """
290
291 if type(value) == List:
292 value = list(value)
293 for i in range(1, len(value)):
294 value[i] = self._retype(value[i])
295 continue
296 pass
297
298 return value
299
301 """Spawn the task."""
302
303 if self.userno == -1:
304 raise RuntimeError, "AIPS user number is not set"
305
306 input_dict = {}
307 for adverb in self._input_list:
308 input_dict[adverb] = self._retype(self.__dict__[adverb])
309
310
311
312 url = None
313 proxy = None
314 for adverb in self._disk_adverbs:
315 if adverb in input_dict:
316 disk = int(input_dict[adverb])
317 if disk == 0:
318 continue
319 if not url and not proxy:
320 url = AIPS.disks[disk].url
321 proxy = AIPS.disks[disk].proxy()
322 proxy.__nonzero__ = lambda: True
323 pass
324 if AIPS.disks[disk].url != url:
325 raise RuntimeError, \
326 "AIPS disks are not on the same machine"
327 input_dict[adverb] = float(AIPS.disks[disk].disk)
328 pass
329 continue
330 if not proxy:
331 raise RuntimeError, \
332 "Unable to determine where to execute task"
333
334 inst = getattr(proxy, self.__class__.__name__)
335 tid = inst.spawn(self._name, self.version, self.userno,
336 self.msgkill, self.isbatch, input_dict)
337
338 self._message_list = []
339 return (proxy, tid)
340
342 """Determine whether the task specified by PROXY and TID has
343 finished."""
344
345 inst = getattr(proxy, self.__class__.__name__)
346 return inst.finished(tid)
347
348 - def messages(self, proxy=None, tid=None):
349 """Return messages for the task specified by PROXY and TID."""
350
351 if not proxy and not tid:
352 return self._message_list
353
354 inst = getattr(proxy, self.__class__.__name__)
355 messages = inst.messages(tid)
356 if not messages:
357 return None
358 for message in messages:
359 self._message_list.append(message[1])
360 if message[0] > abs(self.msgkill):
361 print message[1]
362 pass
363 continue
364 return [message[1] for message in messages]
365
366 - def feed(self, proxy, tid, banana):
367 """Feed the task specified by PROXY and TID with BANANA."""
368
369 inst = getattr(proxy, self.__class__.__name__)
370 return inst.feed(tid, banana)
371
372 - def wait(self, proxy, tid):
373 """Wait for the task specified by PROXY and TID to finish."""
374
375 while not self.finished(proxy, tid):
376 self.messages(proxy, tid)
377 inst = getattr(proxy, self.__class__.__name__)
378 output_dict = inst.wait(tid)
379 for adverb in self._output_list:
380 self.__dict__[adverb] = output_dict[adverb]
381 continue
382 return
383
384 - def abort(self, proxy, tid, sig=signal.SIGTERM):
385 """Abort the task specified by PROXY and TID."""
386
387 inst = getattr(proxy, self.__class__.__name__)
388 return inst.abort(tid, sig)
389
391 """Run the task."""
392
393 (proxy, tid) = self.spawn()
394 loglist = []
395 count = 0
396 rotator = ['|\b', '/\b', '-\b', '\\\b']
397 try:
398 try:
399 while not self.finished(proxy, tid):
400 messages = self.messages(proxy, tid)
401 if messages:
402 loglist.extend(messages)
403 elif sys.stdout.isatty() and len(rotator) > 0:
404 sys.stdout.write(rotator[count % len(rotator)])
405 sys.stdout.flush()
406 pass
407 events = select.select([sys.stdin.fileno()], [], [], 0)
408 if sys.stdin.fileno() in events[0]:
409 flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
410 flags |= os.O_NONBLOCK
411 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
412 message = sys.stdin.read(1024)
413 flags &= ~os.O_NONBLOCK
414 fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
415 self.feed(proxy, tid, message)
416 rotator = []
417 pass
418 count += 1
419 continue
420 pass
421 except KeyboardInterrupt, exception:
422 self.abort(proxy, tid)
423 raise exception
424
425 self.wait(proxy, tid)
426 finally:
427 if self.log:
428 for message in loglist:
429 self.log.write('%s\n' % message)
430 continue
431 self.log.flush()
432 pass
433 else :
434 if AIPS.log:
435 for message in loglist:
436 AIPS.log.write('%s\n' % message)
437 continue
438 AIPS.log.flush()
439 pass
440 pass
441 return
442
445
457
459 if name in self._data_adverbs:
460 prefix = name.replace('data', '')
461 Task.__setattr__(self, prefix + 'name', value.name)
462 Task.__setattr__(self, prefix + 'class', value.klass)
463 Task.__setattr__(self, prefix + 'disk', value.disk)
464 Task.__setattr__(self, prefix + 'seq', value.seq)
465 else:
466
467
468
469
470
471 attr = self._findattr(name)
472 if attr in self._file_adverbs and type(value) == str and \
473 os.path.dirname(value):
474 if len(os.path.basename(value)) > self._strlen_dict[attr] - 2:
475 msg = "string '%s' is too long for attribute '%s'" \
476 % (value, attr)
477 raise ValueError, msg
478 self.__dict__[attr] = value
479 else:
480 Task.__setattr__(self, name, value)
481 pass
482 pass
483 return
484
485 pass
486
487
489
490
491 userno = -1
492
498
505
506 pass
507
508
510 """Transform a Python array into an AIPS array.
511
512 Returns a list suitable for using 1-based indices.
513 """
514
515 try:
516
517 if str(list) == list:
518 return list
519 pass
520 except:
521 pass
522
523 try:
524
525 _list = [None]
526 for l in list:
527 _list.append(AIPSList(l))
528 continue
529 return _list
530 except:
531 pass
532
533
534 return list
535
536
538 """Transform an AIPS array into a Python array.
539
540 Returns a list suitable for using normal 0-based indices.
541 """
542
543 try:
544 if list[0] != None:
545 return list
546
547 _list = []
548 for l in list[1:]:
549 _list.append(PythonList(l))
550 continue
551 return _list
552 except:
553 pass
554
555
556 return list
557
558
559
560 if __name__ == '__main__':
561 import doctest, sys
562 results = doctest.testmod(sys.modules[__name__])
563 sys.exit(results[0])
564