Package gavo :: Package grammars :: Module common
[frames] | no frames]

Source Code for Module gavo.grammars.common

  1  """ 
  2  Base classes and common code for grammars. 
  3   
  4  NOTE: If you add grammars, you have to enter manually them in 
  5  rscdef.builtingrammars.GRAMMAR_REGISTRY (we don't want to import all 
  6  the mess in this package just to make that). 
  7  """ 
  8   
  9  #c Copyright 2008-2019, the GAVO project 
 10  #c 
 11  #c This program is free software, covered by the GNU GPL.  See the 
 12  #c COPYING file in the source distribution. 
 13   
 14   
 15  import codecs 
 16  import gzip 
 17  import re 
 18  import os 
 19  import select 
 20  import subprocess 
 21   
 22  from gavo import base 
 23  from gavo import rscdef 
 24  from gavo import utils 
 25  from gavo.rscdef import procdef 
 26  from gavo.rscdef import rowtriggers 
27 28 29 -class ParseError(base.Error):
30 """is an error raised by grammars if their input is somehow wrong. 31 """
32 - def __init__(self, msg, location=None, record=None):
33 base.Error.__init__(self, msg) 34 self.location, self.record = location, record 35 self.args = [msg, location, record]
36
37 38 -class REAttribute(base.UnicodeAttribute):
39 """is an attribute containing (compiled) RE 40 """
41 - def parse(self, value):
42 if value is None or not value: 43 return None 44 try: 45 return re.compile(value) 46 except re.error as msg: 47 raise base.ui.logOldExc(base.LiteralParseError(self.name_, value, 48 hint="A python regular expression was expected here. Compile" 49 " complained: %s"%unicode(msg)))
50
51 - def unparse(self, value):
52 if value is None: 53 return "" 54 else: 55 return value.pattern
56
57 58 -class FilteredInputFile(object):
59 """a pseudo-file that allows piping data thorugh a shell command. 60 61 It supports read, readline, and close. Close closes the original 62 file, too. 63 64 Warning: the command passed in will be shell-expanded (which is fair 65 since you can pass in any command you want anyway). 66 67 If you pass silent=True, stderr will be redirected to /dev/null. 68 This is probably only attractive for unit tests and such. 69 """
70 - def __init__(self, filterCommand, origFile, silent=False):
71 self.filterCommand, self.origFile = filterCommand, origFile 72 73 processArgs = {"shell": True, 74 "stdin": subprocess.PIPE, 75 "stdout": subprocess.PIPE, 76 "close_fds": True} 77 if silent: 78 processArgs["stderr"] = open("/dev/null", "w") 79 self.process = subprocess.Popen([self.filterCommand], **processArgs) 80 self.fromChild, self.toChild = self.process.stdout, self.process.stdin 81 self.buffer = utils.StreamBuffer(2**14) 82 83 try: 84 self.PIPE_BUF = select.PIPE_BUF 85 except AttributeError: 86 # use POSIX guarantee for python<2.7 87 self.PIPE_BUF = 512
88
89 - def _fillChildBuffer(self):
90 """feeds the child new data, closing the source file if all data 91 has been fed. 92 """ 93 data = self.origFile.read(self.PIPE_BUF) 94 if data=="": 95 # source file exhausted, tell child we're done 96 self.toChild.close() 97 self.toChild = None 98 else: 99 self.toChild.write(data)
100
101 - def _readAll(self):
102 allChunks = [] 103 while True: 104 data = self.read(2**20) 105 if data=="": 106 break 107 allChunks.append(data) 108 return "".join(allChunks)
109
110 - def _fillBuffer(self):
111 """tries to obtain another chunk of data from the child, feeding 112 it new data if possible. 113 """ 114 if self.toChild: 115 writeList = [self.toChild] 116 else: 117 writeList = [] 118 readReady, writeReady, _ = select.select( 119 [self.fromChild], writeList, []) 120 121 if writeReady: 122 self._fillChildBuffer() 123 124 if readReady: 125 data = self.fromChild.read(self.PIPE_BUF) 126 if data=="": 127 self.buffer.doneWriting() 128 if self.process.wait(): 129 raise IOError("Child exited with return code %s"% 130 self.process.wait()) 131 self.buffer.add(data)
132
133 - def read(self, nBytes=None):
134 if nBytes is None: 135 return self._readAll() 136 137 while self.buffer.curSize<nBytes and not self.buffer.finished: 138 self._fillBuffer() 139 140 return self.buffer.get(nBytes) or ""
141
142 - def readline(self):
143 # let's do a quick one ourselves rather than inherit from file 144 # just for this; this just works with unix line ends. 145 while True: 146 val = self.buffer.getToChar("\n") 147 if val is None: 148 if self.buffer.finished: 149 return self.buffer.getRest() 150 else: 151 self._fillBuffer() 152 else: 153 return val
154
155 - def __iter__(self):
156 while True: 157 res = self.readline() 158 if not res: 159 break 160 yield res
161
162 - def close(self):
163 if self.process.returncode is None: 164 self.process.terminate() 165 # not checking the return code here; if the client closed 166 # while the child was still running, an error from it would 167 # not count as an error. 168 self.process.wait() 169 self.origFile.close()
170
171 172 -class Rowfilter(procdef.ProcApp):
173 """A generator for rows coming from a grammar. 174 175 Rowfilters receive rows (i.e., dictionaries) as yielded by a grammar 176 under the name row. Additionally, the embedding row iterator is 177 available under the name rowIter. 178 179 Macros are expanded within the embedding grammar. 180 181 The procedure definition *must* result in a generator, i.e., there must 182 be at least one yield; in general, this will typically be a ``yield row``, 183 but a rowfilter may swallow or create as many rows as desired. 184 185 If you forget to have a yield in the rowfilter source, you'll get a 186 "NoneType is not iterable" error that's a bit hard to understand. 187 188 Here, you can only access whatever comes from the grammar. You can 189 access grammar keys in late parameters as row[key] or, if key is 190 like an identifier, as @key. 191 """ 192 name_ = "rowfilter" 193 requiredType="rowfilter" 194 formalArgs = "row, rowIter" 195
196 - def getFuncCode(self):
198
199 200 -def compileRowfilter(filters):
201 """returns an iterator that "pipes" the rowfilters in filters. 202 203 This means that the output of filters[0] is used as arguments to 204 filters[1] and so on. 205 206 If filters is empty, None is returned. 207 """ 208 if not filters: 209 return 210 iters = [f.compile() for f in filters] #noflake: code gen 211 src = [ 212 "def iterPipe(row, rowIter):", 213 " for item0 in iters[0](row, rowIter):"] 214 for ind in range(1, len(filters)): 215 src.append("%s for item%d in iters[%d](item%d, rowIter):"%( 216 " "*ind, ind, ind, ind-1)) 217 src.append("%s yield item%d"%(" "*len(filters), len(filters)-1)) 218 d = locals() 219 exec "\n".join(src) in d 220 return d["iterPipe"]
221
222 223 -class SourceFieldApp(rscdef.ProcApp):
224 """A procedure application that returns a dictionary added to all 225 incoming rows. 226 227 Use this to programmatically provide information that can be computed 228 once but that is then added to all rows coming from a single source, usually 229 a file. This could be useful to add information on the source of a 230 record or the like. 231 232 The code must return a dictionary. The source that is about to be parsed is 233 passed in as sourceToken. When parsing from files, this simply is the file 234 name. The data the rows will be delivered to is available as "data", which 235 is useful for adding or retrieving meta information. 236 """ 237 name_ = "sourceFields" 238 239 requriedType = "sourceFields" 240 formalArgs = "sourceToken, data"
241
242 243 -class MapKeys(base.Structure):
244 """Mapping of names, specified in long or short forms. 245 246 mapKeys is necessary in grammars like keyValueGrammar or fitsProdGrammar. 247 In these, the source files themselves give key names. Within the GAVO 248 DC, keys are required to be valid python identifiers (i.e., match 249 ``[A-Za-z\_][A-Za-z\_0-9]*``). If keys coming in do not have this form, 250 mapping can force proper names. 251 252 mapKeys could also be used to make incoming names more suitable for 253 matching with shell patterns (like in rowmaker idmaps). 254 """ 255 name_ = "mapKeys" 256 257 _content = base.DataContent(description="Simple mappings in the form" 258 "<dest>:<src>{,<dest>:<src>}") 259 _mappings = base.DictAttribute("maps", keyName="dest", description= 260 "Map source names given in content to the name given in dest.", 261 itemAttD=base.UnicodeAttribute("map"), inverted=True, 262 copyable=True) 263
264 - def _parseShortenedMap(self, literal):
265 try: 266 for dest, src in (p.split(":") for p in literal.split(",")): 267 if dest not in self.maps: 268 self.maps[src.strip()] = dest.strip() 269 else: 270 raise base.LiteralParseError(self.name_, literal, 271 hint="%s clobbers an existing map within the row maker."%dest) 272 except ValueError: 273 raise base.ui.logOldExc(base.LiteralParseError(self.name_, literal, 274 hint="A key-value enumeration of the format k:v {,k:v}" 275 " is expected here"))
276
277 - def onElementComplete(self):
278 if self.content_: 279 self._parseShortenedMap(self.content_) 280 self._onElementCompleteNext(MapKeys)
281
282 - def doMap(self, aDict):
283 """returns dict with the keys mapped according to the defined mappings. 284 """ 285 if self.maps: 286 newDict = {} 287 for k, v in aDict.iteritems(): 288 newDict[self.maps.get(k, k)] = v 289 return newDict 290 else: 291 return aDict
292
293 294 -class RowIterator(object):
295 """An object that encapsulates the a source being parsed by a 296 grammar. 297 298 RowIterators are returned by Grammars' parse methods. Iterate 299 over them to retrieve the rows contained in the source. 300 301 You can also call getParameters on them to retrieve document-global 302 values (e.g., the parameters of a VOTable, a global header of 303 a FITS table). 304 305 The getLocator method should return some string that aids the user 306 in finding out why something went wrong (file name, line number, etc.) 307 308 This default implementation works for when source is a sequence 309 of dictionaries. You will, in general, want to override 310 _iteRows and getLocator, plus probably __init__ (to prepare external 311 resources) and getParameters (if you have them; make sure to update 312 any parameters you have with self.sourceRow as shown in the default 313 getParameters implementation). 314 315 RowIterators are supposed to be self-destructing, i.e., they should 316 release any external resources they hold when _iterRows runs out of 317 items. 318 319 _iterRows should arrange for the instance variable recNo to be incremented 320 by one for each item returned. 321 """ 322 notify = True 323
324 - def __init__(self, grammar, sourceToken, sourceRow=None):
325 self.grammar, self.sourceToken = grammar, sourceToken 326 self.sourceRow = sourceRow 327 self.recNo = 0
328 329 @property
330 - def recordNumber(self):
331 # compatibility alias 332 return self.recNo
333
334 - def __iter__(self):
335 if self.notify: 336 base.ui.notifyNewSource(self.sourceToken) 337 if hasattr(self, "rowfilter"): 338 baseIter = self._iterRowsProcessed() 339 else: 340 baseIter = self._iterRows() 341 if self.grammar.ignoreOn: 342 rowSource = self._filteredIter(baseIter) 343 else: 344 rowSource = baseIter 345 346 try: 347 try: 348 for row in rowSource: 349 # handle dispatched grammars here, too 350 if isinstance(row, tuple): 351 d = row[1] 352 else: 353 d = row 354 355 if isinstance(d, dict): 356 # else it could be a sentinel like FLUSH, which we leave alone 357 if self.sourceRow: 358 d.update(self.sourceRow) 359 d["parser_"] = self 360 361 yield row 362 except Exception: 363 base.ui.notifySourceError() 364 raise 365 366 finally: 367 if self.notify: 368 base.ui.notifySourceFinished()
369
370 - def _filteredIter(self, baseIter):
371 for row in baseIter: 372 if not self.grammar.ignoreOn(row): 373 yield row
374
375 - def _iterRowsProcessed(self):
376 if self.grammar.isDispatching: 377 for dest, row in self._iterRows(): 378 for procRow in self.rowfilter(row, self): 379 yield dest, procRow 380 else: 381 for row in self._iterRows(): 382 for procRow in self.rowfilter(row, self): 383 yield procRow
384
385 - def _iterRows(self):
386 if False: 387 yield None 388 self.grammar = None # don't wait for garbage collection
389
390 - def getParameters(self):
391 res = {"parser_": self} 392 if self.sourceRow: 393 res.update(self.sourceRow) 394 return res
395
396 - def getLocator(self):
397 return "(unknown position -- locator missing)"
398
399 400 -class FileRowIterator(RowIterator):
401 """is a RowIterator base for RowIterators reading files. 402 403 It analyzes the sourceToken to see if it's a string, in which case 404 it opens it as a file name and leaves the file object in self.inputFile. 405 406 Otherwise, it assumes sourceToken already is a file object and binds 407 it to self.inputFile. It then tries to come up with a sensible designation 408 for sourceToken. 409 410 It also inspects the parent grammar for a gunzip attribute. If it is 411 present and true, the input file will be unzipped transparently. 412 """
413 - def __init__(self, grammar, sourceToken, **kwargs):
414 RowIterator.__init__(self, grammar, sourceToken, **kwargs) 415 self.curLine = 1 416 try: 417 self._openFile() 418 except IOError as ex: 419 raise base.ui.logOldExc( 420 base.SourceParseError("I/O operation failed (%s)"%str(ex), 421 source=str(sourceToken), location="start"))
422
423 - def _openFile(self):
424 if isinstance(self.sourceToken, basestring): 425 if self.grammar.enc: 426 self.inputFile = codecs.open(self.sourceToken, "r", self.grammar.enc) 427 else: 428 self.inputFile = open(self.sourceToken) 429 else: 430 self.inputFile = self.sourceToken 431 self.sourceToken = getattr(self.inputFile, "name", repr(self.sourceToken)) 432 433 if hasattr(self.grammar, "preFilter") and self.grammar.preFilter: 434 self.inputFile = FilteredInputFile( 435 self.grammar.preFilter, self.inputFile) 436 437 elif hasattr(self.grammar, "gunzip") and self.grammar.gunzip: 438 self.inputFile = gzip.GzipFile(fileobj=self.inputFile)
439
440 441 -class FileRowAttributes(object):
442 """A mixin for grammars with FileRowIterators. 443 444 This provides some attributes that FileRowIterators interpret, e.g., 445 preFilter. 446 """ 447 _gunzip = base.BooleanAttribute("gunzip", description="Unzip sources" 448 " while reading? (Deprecated, use preFilter='zcat')", default=False) 449 _preFilter = base.UnicodeAttribute("preFilter", description="Shell" 450 " command to pipe the input through before passing it on to the" 451 " grammar. Classical examples include zcat or bzcat, but you" 452 " can commit arbitrary shell atrocities here.", 453 copyable=True) 454
455 - def completeElement(self, ctx):
456 if ctx.restricted: 457 if self.preFilter is not None: 458 raise base.RestrictedElement("preFilter") 459 self._completeElementNext(FileRowAttributes, ctx)
460
461 462 -class GrammarMacroMixin(base.StandardMacroMixin):
463 """A collection of macros available to rowfilters. 464 465 NOTE: All macros should return only one single physical python line, 466 or they will mess up the calculation of what constructs caused errors. 467 """
468 - def macro_inputRelativePath(self, liberalChars="True"):
469 """returns an expression giving the current source's path 470 relative to inputsDir 471 472 liberalChars can be a boolean literal (True, False, etc); if false, 473 a value error is raised if characters that will result in trouble 474 with the product mixin are within the result path. 475 476 In rowmakers fed by grammars with //products#define, better use 477 @prodtblAccref. 478 """ 479 return ('utils.getRelativePath(rowIter.sourceToken,' 480 ' base.getConfig("inputsDir"), liberalChars=%s)'%( 481 base.parseBooleanLiteral(liberalChars)))
482
483 - def macro_fullDLURL(self, dlService):
484 r"""returns a python expression giving a link to the full current data 485 set retrieved through the datalink service. 486 487 You would write \fullDLURL{dlsvc} here, and the macro will expand into 488 something like http://yourserver/currd/dlsvc/dlget?ID=ivo://whatever. 489 490 dlService is the id of the datalink service in the current RD. 491 492 This is intended for "virtual" data where the dataset is generated 493 on the fly through datalink. 494 """ 495 baseURL = self.rd.getById(dlService).getURL("dlget") 496 return ("'%%s?ID=%%s'%%(%s," 497 " urllib.quote_plus(getStandardPubDID(rowIter.sourceToken)))"%( 498 repr(baseURL)))
499
500 - def macro_dlMetaURI(self, dlService):
501 r"""like fullDLURL, except it points to the datalink metadata. 502 503 This is intended for binding to //products#define's datalink 504 parameter. 505 506 If you need the value in a rowmaker, grab it from @prodtblDatalink. 507 """ 508 baseURL = self.rd.getById(dlService).getURL("dlmeta") 509 return ("'%%s?ID=%%s'%%(%s," 510 " urllib.quote_plus(getStandardPubDID(rowIter.sourceToken)))"%( 511 repr(baseURL)))
512
514 """returns an expression for the standard path for a custom preview. 515 516 This consists of resdir, the name of the previewDir property on the 517 embedding DD, and the flat name of the accref (which this macro 518 assumes to see in its namespace as accref; this is usually the 519 case in //products#define, which is where this macro would typically be 520 used). 521 522 As an alternative, there is the splitPreviewPath macro, which does not 523 mogrify the file name. In particular, do not use standardPreviewPath 524 when you have more than a few 1e4 files, as it will have all these 525 files in a single, flat directory, and that can become a chore. 526 527 See the introduction to custom previews for details. 528 """ 529 constantPrefix = os.path.join( 530 rscdef.getInputsRelativePath(self.parent.rd.resdir), 531 self.parent.getProperty("previewDir"))+"/" 532 return (repr(constantPrefix) 533 +"+getFlatName(accref)")
534
535 - def macro_splitPreviewPath(self, ext):
536 """returns an expression for the split standard path for a custom 537 preview. 538 539 As standardPreviewPath, except that the directory hierarchy of the data 540 files will be reproduced in previews. For ext, you should typically pass 541 the extension appropriate for the preview (like {.png} or {.jpeg}). 542 543 See the introduction to custom previews for details. 544 """ 545 constantPrefix = os.path.join( 546 rscdef.getInputsRelativePath(self.parent.rd.resdir), 547 self.parent.getProperty("previewDir"))+"/" 548 return (repr(constantPrefix) 549 +"+accref+'%s'"%ext)
550
551 - def macro_rowsProcessed(self):
552 """returns an expression giving the number of records already 553 ingested for this source. 554 """ 555 return 'rowIter.line'
556
557 - def macro_sourceDate(self):
558 """returns an expression giving the timestamp of the current source. 559 """ 560 return 'datetime.utcfromtimestamp(os.path.getmtime(rowIter.sourceToken))'
561
562 - def macro_srcstem(self):
563 """returns python code for the stem of the source file currently parsed in a rowmaker. 564 565 Example: if you're currently parsing /tmp/foo.bar, the stem is foo. 566 """ 567 return 'getFileStem(rowIter.sourceToken)'
568
569 - def macro_lastSourceElements(self, numElements):
570 """returns an expression calling rmkfuncs.lastSourceElements on 571 the current input path. 572 """ 573 return 'lastSourceElements(rowIter.sourceToken, int(numElements))'
574
575 - def macro_rootlessPath(self):
576 """returns an expression giving the current source's path with 577 the resource descriptor's root removed. 578 """ 579 return ('utils.getRelativePath(rowIter.grammar.rd.resdir,' 580 ' rowIter.sourceToken)')
581
582 - def macro_inputSize(self):
583 """returns an expression giving the size of the current source. 584 """ 585 return 'os.path.getsize(rowIter.sourceToken)'
586
587 - def macro_colNames(self, tableRef):
588 """returns a comma-separated list of column names for a table reference. 589 590 This is convenient if an input file matches the table structure; you 591 can then simply say things like <reGrammar names="\\\\colName{someTable}"/>. 592 """ 593 return ",".join(c.name for c in self.rd.getById(tableRef))
594
595 - def macro_property(self, property):
596 """returns the value of property on the parent DD. 597 """ 598 return self.parent.getProperty(property)
599
600 601 -class Grammar(base.Structure, GrammarMacroMixin):
602 """An abstract grammar. 603 604 Grammars are configured via their structure parameters. Their 605 parse(sourceToken) method returns an object that iterates over rawdicts 606 (dictionaries mapping keys to (typically) strings) that can then be fed 607 through rowmakers; it also has a method getParameters that returns 608 global properties of the whole document (like parameters in VOTables; 609 this will be empty for many kinds of grammars). 610 611 RowIterators will return a reference to themselves in the raw dicts in the 612 parser_ key unless you override their _iterRowsProcessed method (which you 613 shouldn't). This is used by rowmaker macros. 614 615 What exactly sourceToken is is up to the concrete grammar. While 616 typically it's a file name, it might be a sequence of dictionaries, 617 a nevow context, or whatever. 618 619 To derive a concrete Grammar, define a RowIterator for your source 620 and set the rowIterator class attribute to it. 621 """ 622 name_ = "grammar" 623 624 _encoding = base.UnicodeAttribute("enc", default=None, description= 625 "Encoding of strings coming in from source.", copyable=True) 626 _rowfilters = base.StructListAttribute("rowfilters", 627 description="Row filters for this grammar.", 628 childFactory=Rowfilter, copyable=True) 629 _ignoreOn = base.StructAttribute("ignoreOn", default=None, copyable=True, 630 description="Conditions for ignoring certain input records. These" 631 " triggers drop an input record entirely. If you feed multiple" 632 " tables and just want to drop a row from a specific table, you" 633 " can use ignoreOn in a rowmaker.", 634 childFactory=rowtriggers.IgnoreOn) 635 _sourceFields = base.StructAttribute("sourceFields", default=None, 636 copyable=True, description="Code returning a dictionary of values" 637 " added to all returned rows.", childFactory=SourceFieldApp) 638 _properties = base.PropertyAttribute(copyable=True) 639 _original = base.OriginalAttribute() 640 _rd = rscdef.RDAttribute() 641 642 # isDispatching is used by various special grammars to signify the 643 # grammar returns rowdicts for multiple makers. See those. 644 # Here, we just fix it to false so clients can rely on the attribute's 645 # existance. 646 isDispatching = False 647 648 rowIterator = RowIterator 649
650 - def getSourceFields(self, sourceToken, data):
651 """returns a dict containing user-defined fields to be added to 652 all results. 653 """ 654 if self.sourceFields is None: 655 return None 656 if not hasattr(self, "_compiledSourceFields"): 657 self._compiledSourceFields = self.sourceFields.compile() 658 return self._compiledSourceFields(sourceToken, data)
659
660 - def parse(self, sourceToken, targetData=None):
661 ri = self.rowIterator(self, sourceToken, 662 sourceRow=self.getSourceFields(sourceToken, targetData)) 663 if self.rowfilters: 664 ri.rowfilter = compileRowfilter(self.rowfilters) 665 return ri
666
667 668 -class NullGrammar(Grammar):
669 """A grammar that never returns any rows. 670 """ 671 name_ = "nullGrammar"
672
673 674 -class TransparentGrammar(Grammar):
675 """A grammar that returns its sourceToken as the row iterator. 676 677 This only makes sense in extreme situations and never without custom 678 code. If you're not sure you need this, you don't want to know about 679 it. 680 """ 681 name_ = "transparentGrammar" 682
683 - def parse(self, sourceToken, targetData=None):
684 return sourceToken
685