Package Proxy :: Module AIPSTask
[hide private]
[frames] | no frames]

Source Code for Module Proxy.AIPSTask

  1  # Copyright (C) 2005 Joint Institute for VLBI in Europe 
  2  # 
  3  # This program is free software; you can redistribute it and/or modify 
  4  # it under the terms of the GNU General Public License as published by 
  5  # the Free Software Foundation; either version 2 of the License, or 
  6  # (at your option) any later version. 
  7  # 
  8  # This program is distributed in the hope that it will be useful, 
  9  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 10  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 11  # GNU General Public License for more details. 
 12   
 13  # You should have received a copy of the GNU General Public License 
 14  # along with this program; if not, write to the Free Software 
 15  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 16   
 17  """ 
 18   
 19  This module provides the bits and pieces to implement an AIPSTask 
 20  proxy object. 
 21   
 22  """ 
 23   
 24  # Global AIPS defaults. 
 25  from AIPSUtil import ehex 
 26  from Proxy.AIPS import AIPS 
 27   
 28  # The results from parsing POPSDAT.HLP. 
 29  from Proxy.Popsdat import Popsdat 
 30   
 31  # Bits from the generic Task implementation. 
 32  from Proxy.Task import Task 
 33   
 34  # Generic Python stuff. 
 35  import glob, os, pickle, signal, struct 
 36   
37 -class _AIPSTaskParams:
38 - def __parse(self, name):
39 """Determine the proper attributes for the AIPS task NAME by 40 parsing its HELP file.""" 41 42 # Pretend we know nothing yet. 43 task = None 44 desc = None 45 46 popsdat = Popsdat(self.version) 47 48 path = self.version + '/HELP/' + name.upper() + '.HLP' 49 input = open(path) 50 51 # Parse INPUTS section. 52 for line in input: 53 # A line of dashes terminates the parameter definitions. 54 if line.startswith('--------'): 55 break; 56 57 # Comment lines start with ';'. 58 if line.startswith(';'): 59 continue 60 61 # Empty lines start with '\n'. 62 if line.startswith('\n'): 63 continue 64 65 # Continuation lines start with ' '. 66 if line.startswith(' '): 67 continue 68 69 if not task: 70 min_start = line.find('LLLLLLLLLLLL') 71 min_end = line.rfind('L') 72 max_start = line.find('UUUUUUUUUUUU') 73 max_end = line.rfind('U') 74 dir_start = min_start - 2 75 dir_end = min_start - 1 76 if not min_start == -1 and not max_start == -1: 77 task = line.split()[0] 78 continue 79 80 if not desc: 81 if line.startswith(task): 82 desc = line 83 continue 84 85 adverb = line.split()[0].lower() 86 code = line[min_start - 1:min_start] 87 if not code: 88 code = ' ' 89 try: 90 min = float(line[min_start:min_end]) 91 max = float(line[max_start:max_end]) 92 except: 93 min = None 94 max = None 95 96 match_key = None 97 if adverb in popsdat.default_dict: 98 match_key = adverb 99 else: 100 # Some HELP files contain typos. 101 for key in popsdat.default_dict: 102 if key.startswith(adverb): 103 if match_key: 104 msg = "adverb '%s' is ambiguous" % adverb 105 raise AttributeError, msg 106 else: 107 match_key = key 108 if not match_key: 109 match_key = key 110 self.default_dict[adverb] = popsdat.default_dict[match_key] 111 112 if code in ' *&$': 113 self.input_list.append(adverb) 114 if code in '&%$@': 115 self.output_list.append(adverb) 116 if adverb in popsdat.strlen_dict: 117 self.strlen_dict[adverb] = popsdat.strlen_dict[adverb] 118 if min != None: 119 self.min_dict[adverb] = min 120 if max != None: 121 self.max_dict[adverb] = max 122 123 # Parse HELP section. 124 for line in input: 125 # A line of dashes terminates the help message. 126 if line.startswith('--------'): 127 break; 128 129 self.help_string = self.help_string + line 130 continue 131 132 # Parse EXPLAIN section. 133 for line in input: 134 self.explain_string = self.explain_string + line 135 continue 136 137 pass
138
139 - def __init__(self, name, version):
140 self.default_dict = {} 141 self.input_list = [] 142 self.output_list = [] 143 self.min_dict = {} 144 self.max_dict = {} 145 self.strlen_dict = {} 146 self.help_string = '' 147 self.explain_string = '' 148 149 self.name = name 150 if version in os.environ: 151 self.version = os.environ[version] 152 else: 153 self.version = os.environ['AIPS_ROOT'] + '/' + version 154 pass 155 156 path = os.environ['HOME'] + '/.ParselTongue/' \ 157 + os.path.basename(self.version) + '/' \ 158 + name.lower() + '.pickle' 159 160 try: 161 unpickler = pickle.Unpickler(open(path)) 162 self.default_dict = unpickler.load() 163 self.input_list = unpickler.load() 164 self.output_list = unpickler.load() 165 self.min_dict = unpickler.load() 166 self.max_dict = unpickler.load() 167 self.strlen_dict = unpickler.load() 168 self.help_string = unpickler.load() 169 self.explain_string = unpickler.load() 170 except (IOError, EOFError): 171 self.__parse(name) 172 173 # Make sure the directory exists. 174 if not os.path.exists(os.path.dirname(path)): 175 os.makedirs(os.path.dirname(path)) 176 177 pickler = pickle.Pickler(open(path, mode='w')) 178 pickler.dump(self.default_dict) 179 pickler.dump(self.input_list) 180 pickler.dump(self.output_list) 181 pickler.dump(self.min_dict) 182 pickler.dump(self.max_dict) 183 pickler.dump(self.strlen_dict) 184 pickler.dump(self.help_string) 185 pickler.dump(self.explain_string)
186 187 # Provide a dictionary-like interface to deal with the 188 # idiosyncrasies of XML-RPC.
189 - def __getitem__(self, key):
190 return self.__dict__[key]
191 192
193 -class AIPSTask(Task):
194 195 # List of adverbs referring to file names. 196 _file_adverbs = ['infile', 'infile2', 'outfile', 'outprint', 197 'ofmfile', 'boxfile', 'oboxfile'] 198
199 - def __init__(self):
200 Task.__init__(self) 201 self._params = {} 202 self._popsno = {} 203 self._userno = {} 204 self._msgno = {} 205 self._msgkill = {}
206
207 - def params(self, name, version):
208 """Return parameter set for version VERSION of task NAME.""" 209 return _AIPSTaskParams(name, version)
210
211 - def __write_adverb(self, params, file, adverb, value):
212 """Write (sub)value VALUE of adverb ADVERB into TD file FILE.""" 213 214 assert(adverb in params.input_list) 215 216 if type(value) == float: 217 file.write(struct.pack('f', value)) 218 elif type(value) == str: 219 strlen = ((params.strlen_dict[adverb] + 3) // 4) * 4 220 fmt = "%ds" % strlen 221 file.write(struct.pack(fmt, value.ljust(strlen))) 222 elif type(value) == list: 223 for subvalue in value[1:]: 224 self.__write_adverb(params, file, adverb, subvalue) 225 else: 226 raise AssertionError, type(value)
227
228 - def __read_adverb(self, params, file, adverb, value=None):
229 """Read (sub)value for adverb ADVERB from TD file FILE.""" 230 231 assert(adverb in params.output_list) 232 233 # We use the default value for type checks. 234 if value == None: 235 value = params.default_dict[adverb] 236 237 if type(value) == float: 238 (value,) = struct.unpack('f', file.read(4)) 239 elif type(value) == str: 240 strlen = ((params.strlen_dict[adverb] + 3) // 4) * 4 241 fmt = "%ds" % strlen 242 (value,) = struct.unpack(fmt, file.read(strlen)) 243 value.strip() 244 elif type(value) == list: 245 newvalue = [None] 246 for subvalue in value[1:]: 247 subvalue = self.__read_adverb(params, file, adverb, subvalue) 248 newvalue.append(subvalue) 249 continue 250 value = newvalue 251 else: 252 raise AssertionError, type(value) 253 return value
254
255 - def spawn(self, name, version, userno, msgkill, isbatch, input_dict):
256 """Start the task.""" 257 258 params = _AIPSTaskParams(name, version) 259 popsno = _allocate_popsno() 260 index = popsno - 1 261 262 try: 263 # A single hardcoded TV will do until support for multiple 264 # TVs is implemented. 265 ntvdev = 1 266 267 # Construct the environment for the task. For the adverbs 268 # like 'infile', 'outfile' and 'outprint', we split off 269 # the directory component of the pathname and use that as 270 # the area. 271 env = os.environ.copy() 272 area = 'a' 273 for adverb in self._file_adverbs: 274 if adverb in input_dict: 275 assert(ord(area) <= ord('z')) 276 dirname = os.path.dirname(input_dict[adverb]) 277 if dirname: 278 if not os.path.isdir(dirname): 279 msg = "Direcory '%s' does not exist" % dirname 280 raise RuntimeError, msg 281 env[area] = dirname 282 basename = os.path.basename(input_dict[adverb]) 283 input_dict[adverb] = area + ':' + basename 284 area = chr(ord(area) + 1) 285 pass 286 pass 287 continue 288 # Send output to the TV running on this machine. 289 env['TVDEV' + ehex(ntvdev, 2, 0)] = 'sssin:localhost' 290 291 td_name = os.environ['DA00'] + '/TD' + AIPS.revision + '000004;' 292 td_file = open(td_name, mode='r+b') 293 294 td_file.seek(index * 20) 295 td_file.write(struct.pack('8s', name.upper().ljust(8))) 296 td_file.write(struct.pack('l', -999)) 297 td_file.write(struct.pack('2l', 0, 0)) 298 299 td_file.seek(1024 + index * 4096) 300 td_file.write(struct.pack('i', userno)) 301 td_file.write(struct.pack('i', ntvdev)) 302 td_file.write(struct.pack('i', 0)) 303 td_file.write(struct.pack('i', msgkill + 32000 - 1)) 304 td_file.write(struct.pack('i', isbatch)) 305 td_file.write(struct.pack('i', 0)) 306 td_file.write(struct.pack('2i', 0, 0)) 307 td_file.write(struct.pack('f', 1.0)) 308 td_file.write(struct.pack('4s', ' ')) 309 for adverb in params.input_list: 310 self.__write_adverb(params, td_file, adverb, 311 input_dict[adverb]) 312 continue 313 314 td_file.close() 315 316 # Create the message file if necessary and record the 317 # number of messages currently in it. 318 user = ehex(userno, 3, 0) 319 ms_name = os.environ['DA01'] + '/MS' + AIPS.revision \ 320 + user + '000.' + user + ';' 321 if not os.path.exists(ms_name): 322 ms_file = open(ms_name, mode='w') 323 ms_file.truncate(1024) 324 ms_file.close() 325 os.chmod(ms_name, 0664) 326 pass 327 ms_file = open(ms_name, mode='r') 328 (msgno,) = struct.unpack('i', ms_file.read(4)) 329 ms_file.close() 330 331 path = params.version + '/' + os.environ['ARCH'] + '/LOAD/' \ 332 + name.upper() + ".EXE" 333 tid = Task.spawn(self, path, [name.upper() + str(popsno)], env) 334 335 except Exception, exception: 336 _free_popsno(popsno) 337 raise exception 338 339 self._params[tid] = params 340 self._popsno[tid] = popsno 341 self._userno[tid] = userno 342 self._msgkill[tid] = msgkill 343 self._msgno[tid] = msgno 344 return tid
345
346 - def __read_message(self, file, msgno):
347 file.seek((msgno / 10) * 1024 + 8 + (msgno % 10) * 100) 348 (tmp, task, message) = struct.unpack('i8x5s3x80s', file.read(100)) 349 (popsno, priority) = (tmp / 16, tmp % 16) 350 task = task.rstrip() 351 message = message.rstrip() 352 return (task, popsno, priority, message)
353
354 - def messages(self, tid):
355 """Return task's messages.""" 356 357 # Make sure we read the messages, even if we throw them away 358 # later to prevent the task from blocking. 359 messages = Task.messages(self, tid) 360 361 # Strip out all formal messages. 362 start = '%-5s%d' % (self._params[tid].name.upper(), self._popsno[tid]) 363 messages = [msg for msg in messages if not msg.startswith(start)] 364 365 messages = [(1, msg) for msg in messages] 366 367 user = ehex(self._userno[tid], 3, 0) 368 ms_name = os.environ['DA01'] + '/MS' + AIPS.revision \ 369 + user + '000.' + user + ';' 370 ms_file = open(ms_name, mode='r') 371 372 (msgno,) = struct.unpack('i', ms_file.read(4)) 373 while self._msgno[tid] < msgno: 374 (task, popsno, priority, msg) = \ 375 self.__read_message(ms_file, self._msgno[tid]) 376 # Filter 377 if popsno == self._popsno[tid]: 378 messages.append((priority, '%-5s%d: %s' % (task, popsno, msg))) 379 pass 380 self._msgno[tid] += 1 381 continue 382 383 ms_file.close() 384 return messages
385
386 - def wait(self, tid):
387 """Wait for the task to finish.""" 388 389 assert(self.finished(tid)) 390 391 params = self._params[tid] 392 popsno = self._popsno[tid] 393 index = popsno - 1 394 395 td_name = os.environ['DA00'] + '/TDD000004;' 396 397 try: 398 td_file = open(td_name, mode='rb') 399 400 td_file.seek(index * 20 + 8) 401 (result,) = struct.unpack('i', td_file.read(4)) 402 if result != 0: 403 msg = "Task '%s' returns '%d'" % (params.name, result) 404 raise RuntimeError, msg 405 406 td_file.seek(1024 + index * 4096 + 40) 407 output_dict = {} 408 for adverb in params.output_list: 409 output = self.__read_adverb(params, td_file, adverb) 410 output_dict[adverb] = output 411 continue 412 413 td_file.close() 414 415 finally: 416 _free_popsno(popsno) 417 pass 418 419 del self._params[tid] 420 del self._popsno[tid] 421 del self._userno[tid] 422 del self._msgno[tid] 423 Task.wait(self, tid) 424 425 return output_dict
426 427 # AIPS seems to ignore SIGINT, so use SIGTERM instead.
428 - def abort(self, tid, sig=signal.SIGTERM):
429 """Abort a task.""" 430 431 _free_popsno(self._popsno[tid]) 432 433 del self._params[tid] 434 del self._popsno[tid] 435 del self._userno[tid] 436 del self._msgno[tid] 437 438 return Task.abort(self, tid, sig)
439 440 pass # class AIPSTask
441
442 -class AIPSMessageLog:
443 - def __init__(self):
444 return
445
446 - def _open(self, userno):
447 user = ehex(userno, 3, 0) 448 ms_name = os.environ['DA01'] + '/MS' + AIPS.revision \ 449 + user + '000.' + user + ';' 450 return open(ms_name, mode='r+')
451
452 - def zap(self, userno):
453 """Zap message log.""" 454 455 ms_file = self._open(userno) 456 ms_file.write(struct.pack('i', 0)) 457 return True # Return something other than None.
458 459 pass # class AIPSMessageLog
460 461 462 # In order to prevent multiple AIPS instances from using the same POPS 463 # number, every AIPS instance creates a lock file in /tmp. These lock 464 # files are named AIPSx.yyy, where x is the POPS number (in extended 465 # hex) and yyy is the process ID of the AIPS instance. 466
467 -def _allocate_popsno():
468 for popsno in range(1,16): 469 # In order to prevent a race, first create a lock file for 470 # POPSNO. 471 try: 472 path = '/tmp/AIPS' + ehex(popsno, 1, 0) + '.' + str(os.getpid()) 473 fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0666) 474 os.close(fd) 475 except: 476 continue 477 478 # Get a list of likely lock files and iterate over them. 479 # Leave out our own lock file though. 480 files = glob.glob('/tmp/AIPS' + ehex(popsno, 1, 0) + '.[0-9]*') 481 files.remove(path) 482 for file in files: 483 # If the part after the dot isn't an integer, it's not a 484 # proper lock file. 485 try: 486 pid = int(file.split('.')[1]) 487 except: 488 continue 489 490 # Check whether the AIPS instance is still alive. 491 try: 492 os.kill(pid, 0) 493 except: 494 # The POPS number is no longer in use. Try to clean 495 # up the lock file. This might fail though if we 496 # don't own it. 497 try: 498 os.unlink(file) 499 except: 500 pass 501 else: 502 # The POPS number is in use. 503 break 504 else: 505 # The POPS number is still free. 506 return popsno 507 508 # Clean up our own mess. 509 os.unlink(path) 510 511 raise RuntimeError, "No free AIPS POPS number available on this system"
512
513 -def _free_popsno(popsno):
514 path = '/tmp/AIPS' + ehex(popsno, 1, 0) + '.' + str(os.getpid()) 515 os.unlink(path)
516