Package Proxy :: Module ObitTask
[hide private]
[frames] | no frames]

Source Code for Module Proxy.ObitTask

  1  # Copyright (C) 2005 Associated Universities, Inc. Washington DC, USA. 
  2  # Copyright (C) 2005 Joint Institute for VLBI in Europe 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify 
  5  # it under the terms of the GNU General Public License as published by 
  6  # the Free Software Foundation; either version 2 of the License, or 
  7  # (at your option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, 
 10  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 12  # GNU General Public License for more details. 
 13   
 14  # You should have received a copy of the GNU General Public License 
 15  # along with this program; if not, write to the Free Software 
 16  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 17  # 
 18  #  Correspondence concerning this software should be addressed as follows: 
 19  #         Internet email: bcotton@nrao.edu. 
 20  #         Postal address: William Cotton 
 21  #                         National Radio Astronomy Observatory 
 22  #                         520 Edgemont Road 
 23  #                         Charlottesville, VA 22903-2475 USA 
 24  #----------------------------------------------------------------------- 
 25  # 
 26  #                   Obit tasking interface 
 27  # 
 28  #  This module contains classes useful for an Obit tasking interface to python. 
 29  #  An ObitTask object contains input parameters for a given Obit program. 
 30  #     The parameters for a given task are defined in a Task Definition File 
 31  #  (TDF) which gives the order, names, types, ranges and dimensionalities. 
 32  #  A TDF is patterened after AIPS HELP files. 
 33  #     The Task Definition File can be derived from the AIPS Help file with the 
 34  #  addition of: 
 35  #   - A line before the beginning of each parameter definition of the form: 
 36  #   **PARAM** [type] [dim] 
 37  #       where [type] is float or str (string) and [dim] is the  
 38  #       dimensionality as a blank separated list of integers, e.g. 
 39  #       **PARAM** str 12 5       (5 strings of 12 characters) 
 40  #   HINT: No matter what POPS thinks, all strings are multiples of 4 characters 
 41  #   For non AIPS usage dbl (double), int (integer=long), boo (boolean) 
 42  #   are defined. 
 43  # 
 44  #----------------------------------------------------------------------- 
 45   
 46  # Bits from AIPS. 
 47  from AIPSUtil import ehex 
 48   
 49  # Bits from the generic Task implementation. 
 50  from Proxy.Task import Task 
 51   
 52  # Generic Python stuff. 
 53  import fcntl, glob, os, pickle, select, struct, string, pty 
 54   
 55   
56 -class _ObitTaskParams:
57 - def __parse(self, name):
58 """Determine the proper attributes for the Obit task NAME by 59 parsing its TDF file.""" 60 61 task = None 62 strlen = None 63 deff = [0] 64 gotDesc = False 65 min = None 66 max = None 67 68 #print "DEBUG in ObitTaskParams" 69 70 path = os.environ['OBIT'] + '/TDF/' + name + '.TDF' 71 input = open(path) 72 for line in input: 73 # DEBUG 74 #print line 75 # A line of dashes terminates the parameter definitions. 76 if line.startswith('--------'): 77 break; 78 79 # Comment lines start with ';'. 80 if line.startswith(';'): 81 continue 82 83 if not task: 84 task = line.split()[0] 85 min_start = line.find('LLLLLLLLLLLL') 86 min_end = line.rfind('L') 87 max_start = line.find('UUUUUUUUUUUU') 88 max_end = line.rfind('U') 89 dir_start = min_start - 2 90 dir_end = min_start - 1 91 continue 92 93 if line.startswith(task): 94 continue 95 96 if line.startswith(' ') or line.startswith('\n'): 97 continue 98 99 # Description of parameter? 100 if line.startswith('**PARAM**'): 101 gotDesc = True 102 # Get type and dimension. 103 parts = line.split() 104 # Dimensionality. 105 dim = [] 106 total = 1 107 for x in parts[2:]: 108 total *= int(x) 109 dim.append(int(x)); 110 # Want number of strings, not number of characters. 111 if parts[1] == 'str': 112 total = total / dim[0] 113 # Type. 114 type = parts[1] 115 if type == 'float': 116 type = float 117 strlen = None 118 deff = total * [0.0] 119 elif type == 'str': 120 type = str 121 strlen = dim[0] 122 deff = total * [strlen * ' '] 123 elif type == 'int': 124 type = int 125 strlen = None 126 deff = total * [0] 127 elif type == 'boo': 128 type = bool 129 strlen = None 130 deff = total * [False] 131 # print "DEBUG line",line,type,dim 132 133 # If just parsed PARAM line get parameter. 134 elif gotDesc: 135 gotDesc = False 136 adverb = line.split()[0] 137 code = line[min_start - 1:min_start] 138 if not code: 139 code = ' ' 140 try: 141 min = float(line[min_start:min_end].strip()) 142 max = float(line[max_start:max_end].strip()) 143 except: 144 min = None 145 max = None 146 147 # Assume type/dimension is one just read. 148 # If only one entry, convert deff to scalar. 149 if len(deff) == 1: 150 deff = deff[0] 151 self.default_dict[adverb] = deff # default 152 self.dim_dict[adverb] = dim # dimensionality 153 if code in ' *&$' or len(adverb) > 9: 154 self.input_list.append(adverb) 155 if code in '&%$@': 156 self.output_list.append(adverb) 157 if strlen: 158 self.strlen_dict[adverb] = strlen 159 if min != None: 160 self.min_dict[adverb] = min 161 if max != None: 162 self.max_dict[adverb] = max 163 #print "DEBUG adverb", adverb, deff, dim 164 165 # Parse HELP section. 166 for line in input: 167 # A line of dashes terminates the help message. 168 if line.startswith('--------'): 169 break; 170 171 self.help_string = self.help_string + line
172
173 - def __init__(self, name, version):
174 self.default_dict = {} 175 self.input_list = [] 176 self.output_list = [] 177 self.min_dict = {} 178 self.max_dict = {} 179 self.strlen_dict = {} 180 self.help_string = '' 181 self.dim_dict = {} 182 183 self.name = name 184 if version in ['OLD', 'NEW', 'TST']: 185 self.version = os.path.basename(os.environ[version]) 186 else: 187 self.version = version 188 189 path = os.environ['HOME'] + '/.ParselTongue/' \ 190 + self.version + '/' + name + '.pickle' 191 192 try: 193 unpickler = pickle.Unpickler(open(path)) 194 self.default_dict = unpickler.load() 195 self.input_list = unpickler.load() 196 self.output_list = unpickler.load() 197 self.min_dict = unpickler.load() 198 self.max_dict = unpickler.load() 199 self.strlen_dict = unpickler.load() 200 self.dim_dict = unpickler.load() 201 self.help_string = unpickler.load() 202 except (IOError, EOFError): 203 self.__parse(name) 204 205 # Make sure the directory exists. 206 if not os.path.exists(os.path.dirname(path)): 207 os.makedirs(os.path.dirname(path)) 208 209 pickler = pickle.Pickler(open(path, mode='w')) 210 pickler.dump(self.default_dict) 211 pickler.dump(self.input_list) 212 pickler.dump(self.output_list) 213 pickler.dump(self.min_dict) 214 pickler.dump(self.max_dict) 215 pickler.dump(self.strlen_dict) 216 pickler.dump(self.dim_dict) 217 pickler.dump(self.help_string)
218 219 # Provide a dictionary-like interface to deal with the 220 # idiosyncrasies of XML-RPC.
221 - def __getitem__(self, key):
222 return self.__dict__[key]
223 224
225 -class ObitTask(Task):
226 - def __init__(self):
227 Task.__init__(self) 228 self._params = {} 229 self._popsno = {}
230
231 - def params(self, name, version):
232 """Return parameter set for version VERSION of task NAME.""" 233 return _ObitTaskParams(name, version)
234
235 - def __write_adverb(self, params, file, adverb, value):
236 """Write Obit input text file.""" 237 238 assert(adverb in params.input_list) 239 240 # Get type, may be scalar or list 241 dtype = type(value) 242 if dtype == list: 243 dtype = type(value[0]) 244 245 # Convert to string for numeric types 246 if type(value) == list: 247 data = string.join(map(str, value)) 248 else: 249 data = str(value) 250 251 dim = params.dim_dict[adverb] # Dimensionality array 252 dimStr = "(" + str(dim[0]) + ")" 253 if (len(dim) > 1): 254 if (dim[1] > 1): 255 dimStr = "(" + str(dim[0]) + "," + str(dim[1]) + ")" 256 257 if dtype == float: 258 file.write("$Key = " + adverb + " Flt " + dimStr + "\n") 259 file.write(data + "\n") # Write data to file 260 elif dtype == str: 261 file.write("$Key = " + adverb + " Str " + dimStr + "\n") 262 if type(value) == list: 263 for x in value: 264 file.write(x + "\n") # Write data to file 265 else: 266 #print "DEBUG write_adverb", adverb, dtype, dim, value 267 file.write(value + "\n") # Write data to file 268 elif dtype == bool: 269 file.write("$Key = " + adverb + " Boo " + dimStr + "\n") 270 if type(value) == list: 271 #print "DEBUG value", adverb, value 272 for x in value: 273 if x: 274 file.write(" T") # Write data to file. 275 else: 276 file.write(" F") 277 else: 278 if value: 279 file.write(" T") # Write data to file. 280 else: 281 file.write(" F") 282 file.write("\n") 283 elif dtype == int: 284 file.write("$Key = " + adverb + " Int " + dimStr + "\n") 285 file.write(data + "\n" ) # Write data to file. 286 else: 287 #print "DEBUG ObitTask adverb", adverb, dim, dtype 288 raise AssertionError, type(value)
289
290 - def __read_adverb(self, params, file, adverb):
291 """read value from output file.""" 292 293 assert(adverb in params.output_list) 294 295 gotIt = False # Not yet found entry 296 count = 0 # no values parset yet 297 total = 1 298 value = [] # to accept 299 for line in file: 300 # DEBUG 301 #print line 302 # Look for header for parameter 303 if line.startswith("$Key " + adverb): 304 gotIt = True 305 parts = string.split(line) 306 # How many values 307 total = 1 308 # DEBUG print parts 309 for x in parts[3:]: 310 total *= int(x) 311 dtype = parts[2] 312 if type=="str": 313 total = total / parts[3] # Number of strings. 314 315 # Read data one value per line after 'gotIt'. 316 elif gotIt: 317 # DEBUG print "gotIt", type, line 318 if dtype == 'Flt': 319 value.append(float(line)) 320 elif dtype == 'Dbl': 321 value.append(float(line)) 322 elif dtype == 'Str': 323 value.append(line) 324 elif dtype == 'Int': 325 value.append(int(line)) 326 elif dtype == 'Boo': 327 if line.startswith('T'): 328 value.append(True) 329 else: 330 value.append(False) 331 count = count + 1 332 333 if gotIt and count >= total: # Finished? 334 break 335 336 # Convert to scalar if only one. 337 if len(value)==1: 338 value = value[0] 339 # Done 340 # DEBUG print "fetch adverb", adverb, value 341 return value
342
343 - def spawn(self, name, version, userno, msgkill, isbatch, input_dict):
344 """Start the task.""" 345 346 params = _ObitTaskParams(name, version) 347 popsno = _allocate_popsno() 348 index = popsno - 1 349 350 # Set input and output text parameter/result files 351 tmpInput = "/tmp/" + params.name + "Input." + str(popsno) 352 tmpOutput = "/tmp/" + params.name + "Output." + str(popsno) 353 354 in_file = open(tmpInput, mode="w") 355 356 for adverb in params.input_list: 357 self.__write_adverb(params, in_file, adverb, input_dict[adverb]) 358 359 in_file.close() 360 361 # If debugging add a link to the input file to preserve it. 362 if input_dict['DEBUG']: 363 tmpDebug = tmpInput + 'Dbg' 364 if os.access(tmpDebug, os.F_OK): 365 os.unlink(tmpDebug) # Remove any old version file. 366 os.link(tmpInput, tmpDebug) # Add new link. 367 # Tell about it. 368 print "Saving copy of Obit task input in" + tmpDebug 369 370 path = os.environ['OBIT'] +'/bin/' + os.environ['ARCH'] + '/' + name 371 arglist = [name, "-input", tmpInput, "-output", tmpOutput, 372 "-pgmNumber", str(popsno), "-AIPSuser", str(userno)] 373 tid = Task.spawn(self, path, arglist) 374 self._params[tid] = params 375 self._popsno[tid] = popsno 376 return tid
377
378 - def messages(self, tid):
379 """Return task's messages.""" 380 381 # Add a default priority to the messages 382 messages = Task.messages(self, tid) 383 return [(1, msg) for msg in messages]
384
385 - def wait(self, tid):
386 """Wait for the task to finish.""" 387 388 assert(self.finished(tid)) 389 390 params = self._params[tid] 391 popsno = self._popsno[tid] 392 index = popsno - 1 393 394 tmpInput = "/tmp/" + params.name + "Input." + str(popsno) 395 tmpOutput = "/tmp/" + params.name + "Output." + str(popsno) 396 397 output_dict = {} 398 for adverb in params.output_list: 399 # Need to parse whole file each time as order not specified 400 out_file = open(tmpOutput, mode='r') 401 output_dict[adverb] = self.__read_adverb(params, out_file, adverb) 402 out_file.close() 403 404 if os.access(tmpInput, os.F_OK): 405 os.unlink(tmpInput) # Remove input file. 406 if os.access(tmpOutput, os.F_OK): 407 os.unlink(tmpOutput) # Remove output file. 408 409 _free_popsno(popsno) 410 411 del self._params[tid] 412 del self._popsno[tid] 413 Task.wait(self, tid) 414 415 return output_dict
416 417 418 # In order to prevent multiple Obit instances from using the same POPS 419 # number, every Obit instance creates a lock file in /tmp. These lock 420 # files are named Obitx.yyy, where x is the POPS number (in extended 421 # hex) and yyy is the process ID of the Obit instance. 422
423 -def _allocate_popsno():
424 for popsno in range(1,16): 425 # In order to prevent a race, first create a lock file for 426 # POPSNO. 427 try: 428 path = '/tmp/Obit' + ehex(popsno, 1, 0) + '.' + str(os.getpid()) 429 fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0666) 430 os.close(fd) 431 except: 432 continue 433 434 # Get a list of likely lock files and iterate over them. 435 # Leave out our own lock file though. 436 files = glob.glob('/tmp/Obit' + ehex(popsno, 1, 0) + '.[0-9]*') 437 files.remove(path) 438 for file in files: 439 # If the part after the dot isn't an integer, it's not a 440 # proper lock file. 441 try: 442 pid = int(file.split('.')[1]) 443 except: 444 continue 445 446 # Check whether the Obit instance is still alive. 447 try: 448 os.kill(pid, 0) 449 except: 450 # The POPS number is no longer in use. Try to clean 451 # up the lock file. This might fail though if we 452 # don't own it. 453 try: 454 os.unlink(file) 455 except: 456 pass 457 else: 458 # The POPS number is in use. 459 break 460 else: 461 # The POPS number is still free. 462 return popsno 463 464 # Clean up our own mess. 465 os.unlink(path) 466 467 raise RuntimeError, "No free Obit POPS number available on this system"
468
469 -def _free_popsno(popsno):
470 path = '/tmp/Obit' + ehex(popsno, 1, 0) + '.' + str(os.getpid()) 471 os.unlink(path)
472