Package gavo :: Package rscdef :: Module dddef
[frames] | no frames]

Source Code for Module gavo.rscdef.dddef

  1  """ 
  2  Definition of data. 
  3   
  4  Data descriptors describe what to do with data. They contain  
  5  a grammar, information on where to obtain source data from, and "makes", 
  6  a specification of the tables to be generated and how they are made 
  7  from the grammar output. 
  8  """ 
  9   
 10  #c Copyright 2008-2019, the GAVO project 
 11  #c 
 12  #c This program is free software, covered by the GNU GPL.  See the 
 13  #c COPYING file in the source distribution. 
 14   
 15   
 16  import datetime 
 17  import fnmatch 
 18  import glob 
 19  import os 
 20   
 21  from gavo import base 
 22  from gavo import utils 
 23  from gavo.rscdef import builtingrammars 
 24  from gavo.rscdef import column 
 25  from gavo.rscdef import common 
 26  from gavo.rscdef import rmkdef 
 27  from gavo.rscdef import scripting 
 28  from gavo.rscdef import tabledef 
 29   
 30   
31 -class IgnoreSpec(base.Structure):
32 """A specification of sources to ignore. 33 34 Sources mentioned here are compared against the inputsDir-relative path 35 of sources generated by sources (cf. `Element sources`_). If there is 36 a match, the corresponding source will not be processed. 37 38 You can get ignored files from various sources. If you give more 39 than one source, the set of ignored files is the union of the the 40 individual sets. 41 42 fromdbUpdating is a bit special in that the query must return 43 UTC timestamps of the file's mtime during the last ingest in addition 44 to the accrefs (see the tutorial for an example). 45 46 Macros are expanded in the RD. 47 """ 48 name_ = "ignoreSources" 49 50 _fromdb = base.UnicodeAttribute("fromdb", default=None, 51 description="A DB query to obtain a set of sources to ignore; the" 52 " select clause must select exactly one column containing the" 53 " source key. See also `Using fromdb on ignoreSources`_", 54 copyable=True) 55 _fromdbUpdating = base.UnicodeAttribute("fromdbUpdating", 56 default=None, description="A DB query to obtain a set of sources" 57 " to ignore unless they the timestamp on disk is newer than" 58 " what's returned. The query given must return pairs of accrefs" 59 " and UTC timestamps of the last ingest. See also `Using fromdbUpdating" 60 " on ignoreSources`_", copyable=True) 61 _fromfile = common.ResdirRelativeAttribute("fromfile", default=None, 62 description="A name of a file containing blacklisted source" 63 " paths, one per line. Empty lines and lines beginning with a hash" 64 " are ignored.", copyable=True) 65 _patterns = base.ListOfAtomsAttribute("patterns", description= 66 "Shell patterns to ignore. Slashes are treated like any other" 67 " character, i.e., patterns do not know about paths.", 68 itemAttD=base.UnicodeAttribute("pattern", description="Shell pattern" 69 " for source file(s), relative to resource directory."), 70 copyable=True) 71 _rd = common.RDAttribute() 72
73 - def completeElement(self, ctx):
74 self._completeElementNext(IgnoreSpec, ctx) 75 if self.fromdb and self.rd: 76 self.fromdb = self.rd.expand(self.fromdb) 77 if self.fromdbUpdating and self.rd: 78 self.fromdbUpdating = self.rd.expand(self.fromdbUpdating)
79
80 - def prepare(self, connection):
81 """sets attributes to speed up isIgnored() 82 """ 83 self.inputsDir = base.getConfig("inputsDir") 84 # ignored is a dict either mapping to None (ignore unconditionally) 85 # or to a UTC datetime.datetime instance (ignore if not newer than 86 # that datetime). 87 self.ignored = {} 88 89 if self.fromdb and connection is not None: 90 try: 91 with connection.savepoint(): 92 for r in connection.query(self.fromdb): 93 self.ignored[r[0]] = None 94 except base.DBError: # table probably doesn't exist yet. 95 base.ui.notifyError("ignore fromdbUpdating failed --" 96 " unless you have a typo in the query, fix this by importing the RD") 97 98 if self.fromdbUpdating and connection is not None: 99 try: 100 with connection.savepoint(): 101 for r in connection.query(self.fromdbUpdating): 102 self.ignored[r[0]] = r[1] 103 except base.DBError: # table probably doesn't exist yet. 104 base.ui.notifyError("ignore fromdbUpdating failed --" 105 " unless you have a typo in the query, fix this by importing the RD") 106 107 if self.fromfile: 108 for ln in open(self.fromfile): 109 ln = ln.strip() 110 if ln and not ln.startswith("#"): 111 self.ignored[ln] = None
112
113 - def isIgnored(self, path):
114 """returns true if path, made inputsdir-relative, should be ignored. 115 """ 116 try: 117 path = utils.getRelativePath(path, self.inputsDir, liberalChars=True) 118 except ValueError: # not in inputs, use full path. 119 pass 120 121 lastMtime = self.ignored.get(path, base.NotGiven) 122 if lastMtime is base.NotGiven: 123 # not mentioned: proceed to pattern test 124 pass 125 elif lastMtime is None: 126 # unconditionally ignored 127 return True 128 else: 129 # lastMtime is given, now see if the file has been touched. 130 if (lastMtime< 131 datetime.datetime.utcfromtimestamp( 132 os.path.getmtime(os.path.join(self.inputsDir, path)))): 133 # changed since last import, proceed to pattern test 134 # (but of course that has not triggered before either. 135 # ah well). 136 pass 137 else: 138 # file unchanged, ignore 139 return True 140 141 for pat in self.patterns: 142 if fnmatch.fnmatch(path, pat): 143 return True 144 return False
145 146
147 -class SourceSpec(base.Structure):
148 """A Specification of a data descriptor's inputs. 149 150 This will typcially be files taken from a file system. If so, DaCHS will, 151 in each directory, process the files in alphabetical order. No guarantees 152 are made as to the sequence directories are processed in. 153 154 Multiple patterns are processed in the order given in the RD. 155 """ 156 name_ = "sources" 157 158 _patterns = base.ListOfAtomsAttribute("patterns", description= 159 "Paths to the source files. You can use shell patterns here.", 160 itemAttD=base.UnicodeAttribute("pattern", description="Shell pattern" 161 " for source file(s), relative to resource directory."), 162 copyable=True) 163 _items = base.ListOfAtomsAttribute("items", description= 164 "String literals to pass to grammars. In contrast to patterns," 165 " they are not interpreted as file names but passed to the" 166 " grammar verbatim. Normal grammars do not like this. It is" 167 " mainly intended for use with custom or null grammars.", 168 itemAttD=base.UnicodeAttribute("item", 169 description="Grammar-specific string"), copyable=True) 170 _recurse = base.BooleanAttribute("recurse", default=False, 171 description="Search for pattern(s) recursively in their directory" 172 " part(s)?", copyable=True) 173 _ignore = base.StructAttribute("ignoredSources", childFactory= 174 IgnoreSpec, description="Specification of sources that should not" 175 " be processed although they match patterns. Typically used" 176 " in update-type data descriptors.", copyable=True) 177 _file = base.DataContent(description="A single" 178 " file name (this is for convenience)", copyable="True") 179 _original = base.OriginalAttribute() 180
181 - def __iter__(self):
182 return self.iterSources()
183
184 - def completeElement(self, ctx):
185 if self.ignoredSources is base.Undefined: 186 self.ignoredSources = base.makeStruct(IgnoreSpec) 187 188 newPatterns = [] 189 for pat in self.patterns: 190 try: 191 newPatterns.append(pat.encode("ascii")) 192 except UnicodeEncodeError: 193 raise base.DataError("Pattern %r contains a non-ASCII" 194 " characters DaCHS currently" 195 " forbids that. If you need non-ASCII in your file" 196 " names, complain to dachs-users@g-vo.org."%pat) 197 self.patterns = newPatterns 198 199 self._completeElementNext(SourceSpec, ctx)
200
201 - def _expandDirParts(self, dirParts, ignoreDotDirs=True):
202 """expands a list of directories into a list of them and all their 203 descendants. 204 205 It follows symbolic links but doesn't do any bookkeeping, so bad 206 things will happen if the directory graph contains cycles. 207 """ 208 res = [] 209 for root in dirParts: 210 for root, dirs, files in os.walk(root): 211 if ignoreDotDirs: 212 if os.path.basename(root).startswith("."): 213 continue 214 dirs = [dir for dir in dirs if not dir.startswith(".")] 215 dirs = (os.path.join(root, dir) for dir in dirs) 216 res.extend(dir for dir in dirs if os.path.isdir(dir)) 217 for child in files: 218 destName = os.path.join(root, child) 219 if os.path.islink(destName) and not os.path.isfile(destName): 220 res.extend(self._expandDirParts(destName)) 221 return res
222
223 - def iterSources(self, connection=None):
224 self.ignoredSources.prepare(connection) 225 for item in self.items: 226 if not self.ignoredSources.isIgnored(item): 227 yield item 228 229 baseDir = "" 230 if self.parent.rd: 231 baseDir = self.parent.rd.resdir.encode("ascii") 232 233 for pattern in self.patterns: 234 dirPart, baseName = os.path.split(pattern) 235 if self.parent.rd: 236 dirParts = [os.path.join(baseDir, dirPart)] 237 else: 238 dirParts = [dirPart] 239 if self.recurse: 240 dirParts = dirParts+self._expandDirParts(dirParts) 241 242 for dir in sorted(dirParts): 243 for name in sorted(glob.glob(os.path.join(dir, baseName))): 244 fullName = os.path.abspath(name) 245 if not self.ignoredSources.isIgnored(fullName): 246 try: 247 yield fullName.encode("ascii") 248 except (UnicodeDecodeError, UnicodeEncodeError): 249 raise base.DataError("File %r has" 250 " non-ASCII name components. DaCHS currently" 251 " forbids that. If you need non-ASCII in your file" 252 " names, complain to dachs-users@g-vo.org."%( 253 fullName)) 254 255 if self.content_: 256 yield os.path.abspath(os.path.join(baseDir, self.content_))
257 258
259 - def __nonzero__(self):
260 return (not not self.patterns) or (not not self.items 261 ) or (not not self.content_)
262 263
264 -class Make(base.Structure, scripting.ScriptingMixin):
265 """A build recipe for tables belonging to a data descriptor. 266 267 All makes belonging to a DD will be processed in the order in which they 268 appear in the file. 269 """ 270 name_ = "make" 271 272 _table = base.ReferenceAttribute("table", 273 description="Reference to the table to be embedded", 274 default=base.Undefined, 275 copyable=True, 276 forceType=tabledef.TableDef) 277 278 _rowmaker = base.ReferenceAttribute("rowmaker", 279 default=base.NotGiven, 280 forceType=rmkdef.RowmakerDef, 281 description="The rowmaker (i.e., mapping rules from grammar keys to" 282 " table columns) for the table being made.", 283 copyable=True) 284 285 _parmaker = base.ReferenceAttribute("parmaker", 286 default=base.NotGiven, 287 forceType=rmkdef.ParmakerDef, 288 description="The parmaker (i.e., mapping rules from grammar parameters" 289 " to table parameters) for the table being made. You will usually" 290 " not give a parmaker.", 291 copyable=True) 292 293 _role = base.UnicodeAttribute("role", 294 default=None, 295 description="The role of the embedded table within the data set", 296 copyable=True) 297 298 _rowSource = base.EnumeratedUnicodeAttribute("rowSource", 299 default="rows", 300 validValues=["rows", "parameters"], 301 description="Source for the raw rows processed by this rowmaker.", 302 copyable=True, 303 strip=True) 304
305 - def __repr__(self):
306 return "Make(table=%r, rowmaker=%r)"%( 307 self.table and self.table.id, self.rowmaker and self.rowmaker.id)
308
309 - def onParentComplete(self):
310 if self.rowmaker is base.NotGiven: 311 self.rowmaker = rmkdef.RowmakerDef.makeIdentityFromTable(self.table)
312
313 - def getExpander(self):
314 """used by the scripts of expanding their source. 315 316 We always return the expander of the table being made. 317 """ 318 return self.table.getExpander()
319
320 - def create(self, connection, parseOptions, tableFactory, **kwargs):
321 """returns a new empty instance of the table this is making. 322 """ 323 newTable = tableFactory(self.table, 324 parseOptions=parseOptions, connection=connection, role=self.role, 325 create=True, **kwargs) 326 if (self.table.onDisk 327 and not getattr(self.parent, "updating", False)): 328 newTable._runScripts = self.getRunner() 329 return newTable
330
331 - def runParmakerFor(self, grammarParameters, destTable):
332 """feeds grammarParameter to destTable. 333 """ 334 if self.parmaker is base.NotGiven: 335 return 336 parmakerFunc = self.parmaker.compileForTableDef(destTable.tableDef) 337 destTable.setParams(parmakerFunc(grammarParameters, destTable), 338 raiseOnBadKeys=False)
339 340
341 -class DataDescriptor(base.Structure, base.ComputedMetaMixin, 342 common.IVOMetaMixin, tabledef.PublishableDataMixin):
343 """A description of how to process data from a given set of sources. 344 345 Data descriptors bring together a grammar, a source specification and 346 "makes", each giving a table and a rowmaker to feed the table from the 347 grammar output. 348 349 They are the "executable" parts of a resource descriptor. Their ids 350 are used as arguments to gavoimp for partial imports. 351 """ 352 name_ = "data" 353 resType = "data" 354 355 _rowmakers = base.StructListAttribute("rowmakers", 356 childFactory=rmkdef.RowmakerDef, 357 description="Embedded build rules (preferably put rowmakers directly" 358 " into make elements)", 359 copyable=True, 360 before="makes") 361 362 _tables = base.StructListAttribute("tables", 363 childFactory=tabledef.TableDef, 364 description="Embedded table definitions (usually, tables are defined" 365 " toplevel)", 366 copyable=True, 367 before="makes") 368 369 _grammar = base.MultiStructAttribute("grammar", 370 default=None, 371 childFactory=builtingrammars.getGrammar, 372 childNames=builtingrammars.GRAMMAR_REGISTRY.keys(), 373 description="Grammar used to parse this data set.", 374 copyable=True, 375 before="makes") 376 377 _sources = base.StructAttribute("sources", 378 default=None, 379 childFactory=SourceSpec, 380 description="Specification of sources that should be fed to the grammar.", 381 copyable=True, 382 before="grammar") 383 384 _dependents = base.ListOfAtomsAttribute("dependents", 385 itemAttD=base.UnicodeAttribute("recreateAfter"), 386 description="A data ID to recreate when this resource is" 387 " remade; use # syntax to reference in other RDs.") 388 389 _auto = base.BooleanAttribute("auto", 390 default=True, 391 description="Import this data set if not explicitly" 392 " mentioned on the command line?") 393 394 _updating = base.BooleanAttribute("updating", 395 default=False, 396 description="Keep existing tables on import? You usually want this" 397 " False unless you have some kind of sources management," 398 " e.g., via a sources ignore specification.", 399 copyable=True) 400 401 _makes = base.StructListAttribute("makes", 402 childFactory=Make, 403 copyable=True, 404 description="Specification of a target table and the rowmaker" 405 " to feed them.") 406 407 _params = common.ColumnListAttribute("params", 408 childFactory=column.Param, 409 description='Param ("global columns") for this data (mostly for' 410 ' VOTable serialization).', 411 copyable=True) 412 413 _properties = base.PropertyAttribute() 414 415 _rd = common.RDAttribute() 416 417 _original = base.OriginalAttribute() 418 419 metaModel = ("title(1), creationDate(1), description(1)," 420 "subject, referenceURL(1)") 421
422 - def __repr__(self):
423 return "<data descriptor with id %s>"%self.id
424
425 - def validate(self):
426 self._validateNext(DataDescriptor) 427 if self.registration and self.id is None: 428 raise base.StructureError("Published data needs an assigned id.")
429
430 - def onElementComplete(self):
431 self._onElementCompleteNext(DataDescriptor) 432 for t in self.tables: 433 t.setMetaParent(self) 434 if self.registration: 435 self.registration.register()
436 437 # since we want to be able to create DDs dynamically , they must find their 438 # meta parent themselves. We do this while the DD is being adopted; 439 # the rules here are: if the parent is a meta mixin itself, it's the 440 # meta parent, if it has an rd attribute, use that, else give up. 441 # TODO: For DDs on cores, it would be *desirable* to come up 442 # with some magic that makes the current service their meta parent. 443
444 - def _getParent(self):
445 return self.__parent
446
447 - def _setParent(self, value):
448 self.__parent = value 449 if isinstance(value, base.MetaMixin): 450 self.setMetaParent(value) 451 elif hasattr(value, "rd"): 452 self.setMetaParent(value.rd)
453 454 parent = property(_getParent, _setParent) 455
456 - def iterSources(self, connection=None):
457 if self.sources: 458 return self.sources.iterSources(connection) 459 else: 460 return iter([])
461
462 - def __iter__(self):
463 for m in self.makes: 464 yield m.table
465
466 - def iterTableDefs(self):
467 """iterates over the definitions of all the tables built by this DD. 468 469 This will not include system tables. 470 """ 471 for m in self.makes: 472 if not m.table.system: 473 yield m.table
474
475 - def getTableDefById(self, id):
476 for td in self.iterTableDefs(): 477 if td.id==id: 478 return td 479 raise base.StructureError("No table name %s will be built"%id)
480
481 - def getTableDefWithRole(self, role):
482 for m in self.makes: 483 if m.role==role: 484 return m.table 485 raise base.StructureError("No table def with role '%s'"%role)
486
487 - def getPrimary(self):
488 """returns the "primary" table definition in the data descriptor. 489 490 "primary" means the only table in a one-table dd, the table with the 491 role "primary" if there are more. If no matching table is found, a 492 StructureError is raised. 493 """ 494 if len(self.makes)==1: 495 return self.makes[0].table 496 else: 497 try: 498 return self.getTableDefWithRole("primary") 499 except base.StructureError: # raise more telling message 500 pass 501 raise base.StructureError("Ambiguous request for primary table")
502
503 - def copyShallowly(self):
504 """returns a shallow copy of self. 505 506 Sources are not copied. 507 """ 508 return DataDescriptor(self.parent, rowmakers=self.rowmakers[:], 509 tables=self.tables[:], grammar=self.grammar, makes=self.makes[:])
510
511 - def getURL(self, rendName, absolute=True):
512 # there's no sensible URL for DDs; thus, let people browse 513 # the RD info. At least they should find links to any tables 514 # included here there. 515 basePath = "%sbrowse/%s"%( 516 base.getConfig("web", "nevowRoot"), 517 self.rd.sourceId) 518 if absolute: 519 return base.makeAbsoluteURL(basePath) 520 return basePath
521