1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18
19 This module provides the bits and pieces to implement an AIPSTask
20 proxy object.
21
22 """
23
24
25 from AIPSUtil import ehex
26 from Proxy.AIPS import AIPS
27
28
29 from Proxy.Popsdat import Popsdat
30
31
32 from Proxy.Task import Task
33
34
35 import glob, os, pickle, signal, struct
36
39 """Determine the proper attributes for the AIPS task NAME by
40 parsing its HELP file."""
41
42
43 task = None
44 desc = None
45
46 popsdat = Popsdat(self.version)
47
48 path = self.version + '/HELP/' + name.upper() + '.HLP'
49 input = open(path)
50
51
52 for line in input:
53
54 if line.startswith('--------'):
55 break;
56
57
58 if line.startswith(';'):
59 continue
60
61
62 if line.startswith('\n'):
63 continue
64
65
66 if line.startswith(' '):
67 continue
68
69 if not task:
70 min_start = line.find('LLLLLLLLLLLL')
71 min_end = line.rfind('L')
72 max_start = line.find('UUUUUUUUUUUU')
73 max_end = line.rfind('U')
74 dir_start = min_start - 2
75 dir_end = min_start - 1
76 if not min_start == -1 and not max_start == -1:
77 task = line.split()[0]
78 continue
79
80 if not desc:
81 if line.startswith(task):
82 desc = line
83 continue
84
85 adverb = line.split()[0].lower()
86 code = line[min_start - 1:min_start]
87 if not code:
88 code = ' '
89 try:
90 min = float(line[min_start:min_end])
91 max = float(line[max_start:max_end])
92 except:
93 min = None
94 max = None
95
96 match_key = None
97 if adverb in popsdat.default_dict:
98 match_key = adverb
99 else:
100
101 for key in popsdat.default_dict:
102 if key.startswith(adverb):
103 if match_key:
104 msg = "adverb '%s' is ambiguous" % adverb
105 raise AttributeError, msg
106 else:
107 match_key = key
108 if not match_key:
109 match_key = key
110 self.default_dict[adverb] = popsdat.default_dict[match_key]
111
112 if code in ' *&$':
113 self.input_list.append(adverb)
114 if code in '&%$@':
115 self.output_list.append(adverb)
116 if adverb in popsdat.strlen_dict:
117 self.strlen_dict[adverb] = popsdat.strlen_dict[adverb]
118 if min != None:
119 self.min_dict[adverb] = min
120 if max != None:
121 self.max_dict[adverb] = max
122
123
124 for line in input:
125
126 if line.startswith('--------'):
127 break;
128
129 self.help_string = self.help_string + line
130 continue
131
132
133 for line in input:
134 self.explain_string = self.explain_string + line
135 continue
136
137 pass
138
140 self.default_dict = {}
141 self.input_list = []
142 self.output_list = []
143 self.min_dict = {}
144 self.max_dict = {}
145 self.strlen_dict = {}
146 self.help_string = ''
147 self.explain_string = ''
148
149 self.name = name
150 if version in os.environ:
151 self.version = os.environ[version]
152 else:
153 self.version = os.environ['AIPS_ROOT'] + '/' + version
154 pass
155
156 path = os.environ['HOME'] + '/.ParselTongue/' \
157 + os.path.basename(self.version) + '/' \
158 + name.lower() + '.pickle'
159
160 try:
161 unpickler = pickle.Unpickler(open(path))
162 self.default_dict = unpickler.load()
163 self.input_list = unpickler.load()
164 self.output_list = unpickler.load()
165 self.min_dict = unpickler.load()
166 self.max_dict = unpickler.load()
167 self.strlen_dict = unpickler.load()
168 self.help_string = unpickler.load()
169 self.explain_string = unpickler.load()
170 except (IOError, EOFError):
171 self.__parse(name)
172
173
174 if not os.path.exists(os.path.dirname(path)):
175 os.makedirs(os.path.dirname(path))
176
177 pickler = pickle.Pickler(open(path, mode='w'))
178 pickler.dump(self.default_dict)
179 pickler.dump(self.input_list)
180 pickler.dump(self.output_list)
181 pickler.dump(self.min_dict)
182 pickler.dump(self.max_dict)
183 pickler.dump(self.strlen_dict)
184 pickler.dump(self.help_string)
185 pickler.dump(self.explain_string)
186
187
188
190 return self.__dict__[key]
191
192
194
195
196 _file_adverbs = ['infile', 'infile2', 'outfile', 'outprint',
197 'ofmfile', 'boxfile', 'oboxfile']
198
200 Task.__init__(self)
201 self._params = {}
202 self._popsno = {}
203 self._userno = {}
204 self._msgno = {}
205 self._msgkill = {}
206
207 - def params(self, name, version):
210
212 """Write (sub)value VALUE of adverb ADVERB into TD file FILE."""
213
214 assert(adverb in params.input_list)
215
216 if type(value) == float:
217 file.write(struct.pack('f', value))
218 elif type(value) == str:
219 strlen = ((params.strlen_dict[adverb] + 3) // 4) * 4
220 fmt = "%ds" % strlen
221 file.write(struct.pack(fmt, value.ljust(strlen)))
222 elif type(value) == list:
223 for subvalue in value[1:]:
224 self.__write_adverb(params, file, adverb, subvalue)
225 else:
226 raise AssertionError, type(value)
227
229 """Read (sub)value for adverb ADVERB from TD file FILE."""
230
231 assert(adverb in params.output_list)
232
233
234 if value == None:
235 value = params.default_dict[adverb]
236
237 if type(value) == float:
238 (value,) = struct.unpack('f', file.read(4))
239 elif type(value) == str:
240 strlen = ((params.strlen_dict[adverb] + 3) // 4) * 4
241 fmt = "%ds" % strlen
242 (value,) = struct.unpack(fmt, file.read(strlen))
243 value.strip()
244 elif type(value) == list:
245 newvalue = [None]
246 for subvalue in value[1:]:
247 subvalue = self.__read_adverb(params, file, adverb, subvalue)
248 newvalue.append(subvalue)
249 continue
250 value = newvalue
251 else:
252 raise AssertionError, type(value)
253 return value
254
255 - def spawn(self, name, version, userno, msgkill, isbatch, input_dict):
256 """Start the task."""
257
258 params = _AIPSTaskParams(name, version)
259 popsno = _allocate_popsno()
260 index = popsno - 1
261
262 try:
263
264
265 ntvdev = 1
266
267
268
269
270
271 env = os.environ.copy()
272 area = 'a'
273 for adverb in self._file_adverbs:
274 if adverb in input_dict:
275 assert(ord(area) <= ord('z'))
276 dirname = os.path.dirname(input_dict[adverb])
277 if dirname:
278 if not os.path.isdir(dirname):
279 msg = "Direcory '%s' does not exist" % dirname
280 raise RuntimeError, msg
281 env[area] = dirname
282 basename = os.path.basename(input_dict[adverb])
283 input_dict[adverb] = area + ':' + basename
284 area = chr(ord(area) + 1)
285 pass
286 pass
287 continue
288
289 env['TVDEV' + ehex(ntvdev, 2, 0)] = 'sssin:localhost'
290
291 td_name = os.environ['DA00'] + '/TD' + AIPS.revision + '000004;'
292 td_file = open(td_name, mode='r+b')
293
294 td_file.seek(index * 20)
295 td_file.write(struct.pack('8s', name.upper().ljust(8)))
296 td_file.write(struct.pack('l', -999))
297 td_file.write(struct.pack('2l', 0, 0))
298
299 td_file.seek(1024 + index * 4096)
300 td_file.write(struct.pack('i', userno))
301 td_file.write(struct.pack('i', ntvdev))
302 td_file.write(struct.pack('i', 0))
303 td_file.write(struct.pack('i', msgkill + 32000 - 1))
304 td_file.write(struct.pack('i', isbatch))
305 td_file.write(struct.pack('i', 0))
306 td_file.write(struct.pack('2i', 0, 0))
307 td_file.write(struct.pack('f', 1.0))
308 td_file.write(struct.pack('4s', ' '))
309 for adverb in params.input_list:
310 self.__write_adverb(params, td_file, adverb,
311 input_dict[adverb])
312 continue
313
314 td_file.close()
315
316
317
318 user = ehex(userno, 3, 0)
319 ms_name = os.environ['DA01'] + '/MS' + AIPS.revision \
320 + user + '000.' + user + ';'
321 if not os.path.exists(ms_name):
322 ms_file = open(ms_name, mode='w')
323 ms_file.truncate(1024)
324 ms_file.close()
325 os.chmod(ms_name, 0664)
326 pass
327 ms_file = open(ms_name, mode='r')
328 (msgno,) = struct.unpack('i', ms_file.read(4))
329 ms_file.close()
330
331 path = params.version + '/' + os.environ['ARCH'] + '/LOAD/' \
332 + name.upper() + ".EXE"
333 tid = Task.spawn(self, path, [name.upper() + str(popsno)], env)
334
335 except Exception, exception:
336 _free_popsno(popsno)
337 raise exception
338
339 self._params[tid] = params
340 self._popsno[tid] = popsno
341 self._userno[tid] = userno
342 self._msgkill[tid] = msgkill
343 self._msgno[tid] = msgno
344 return tid
345
347 file.seek((msgno / 10) * 1024 + 8 + (msgno % 10) * 100)
348 (tmp, task, message) = struct.unpack('i8x5s3x80s', file.read(100))
349 (popsno, priority) = (tmp / 16, tmp % 16)
350 task = task.rstrip()
351 message = message.rstrip()
352 return (task, popsno, priority, message)
353
355 """Return task's messages."""
356
357
358
359 messages = Task.messages(self, tid)
360
361
362 start = '%-5s%d' % (self._params[tid].name.upper(), self._popsno[tid])
363 messages = [msg for msg in messages if not msg.startswith(start)]
364
365 messages = [(1, msg) for msg in messages]
366
367 user = ehex(self._userno[tid], 3, 0)
368 ms_name = os.environ['DA01'] + '/MS' + AIPS.revision \
369 + user + '000.' + user + ';'
370 ms_file = open(ms_name, mode='r')
371
372 (msgno,) = struct.unpack('i', ms_file.read(4))
373 while self._msgno[tid] < msgno:
374 (task, popsno, priority, msg) = \
375 self.__read_message(ms_file, self._msgno[tid])
376
377 if popsno == self._popsno[tid]:
378 messages.append((priority, '%-5s%d: %s' % (task, popsno, msg)))
379 pass
380 self._msgno[tid] += 1
381 continue
382
383 ms_file.close()
384 return messages
385
386 - def wait(self, tid):
387 """Wait for the task to finish."""
388
389 assert(self.finished(tid))
390
391 params = self._params[tid]
392 popsno = self._popsno[tid]
393 index = popsno - 1
394
395 td_name = os.environ['DA00'] + '/TDD000004;'
396
397 try:
398 td_file = open(td_name, mode='rb')
399
400 td_file.seek(index * 20 + 8)
401 (result,) = struct.unpack('i', td_file.read(4))
402 if result != 0:
403 msg = "Task '%s' returns '%d'" % (params.name, result)
404 raise RuntimeError, msg
405
406 td_file.seek(1024 + index * 4096 + 40)
407 output_dict = {}
408 for adverb in params.output_list:
409 output = self.__read_adverb(params, td_file, adverb)
410 output_dict[adverb] = output
411 continue
412
413 td_file.close()
414
415 finally:
416 _free_popsno(popsno)
417 pass
418
419 del self._params[tid]
420 del self._popsno[tid]
421 del self._userno[tid]
422 del self._msgno[tid]
423 Task.wait(self, tid)
424
425 return output_dict
426
427
428 - def abort(self, tid, sig=signal.SIGTERM):
429 """Abort a task."""
430
431 _free_popsno(self._popsno[tid])
432
433 del self._params[tid]
434 del self._popsno[tid]
435 del self._userno[tid]
436 del self._msgno[tid]
437
438 return Task.abort(self, tid, sig)
439
440 pass
441
445
446 - def _open(self, userno):
447 user = ehex(userno, 3, 0)
448 ms_name = os.environ['DA01'] + '/MS' + AIPS.revision \
449 + user + '000.' + user + ';'
450 return open(ms_name, mode='r+')
451
452 - def zap(self, userno):
453 """Zap message log."""
454
455 ms_file = self._open(userno)
456 ms_file.write(struct.pack('i', 0))
457 return True
458
459 pass
460
461
462
463
464
465
466
468 for popsno in range(1,16):
469
470
471 try:
472 path = '/tmp/AIPS' + ehex(popsno, 1, 0) + '.' + str(os.getpid())
473 fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0666)
474 os.close(fd)
475 except:
476 continue
477
478
479
480 files = glob.glob('/tmp/AIPS' + ehex(popsno, 1, 0) + '.[0-9]*')
481 files.remove(path)
482 for file in files:
483
484
485 try:
486 pid = int(file.split('.')[1])
487 except:
488 continue
489
490
491 try:
492 os.kill(pid, 0)
493 except:
494
495
496
497 try:
498 os.unlink(file)
499 except:
500 pass
501 else:
502
503 break
504 else:
505
506 return popsno
507
508
509 os.unlink(path)
510
511 raise RuntimeError, "No free AIPS POPS number available on this system"
512
514 path = '/tmp/AIPS' + ehex(popsno, 1, 0) + '.' + str(os.getpid())
515 os.unlink(path)
516