Package gavo :: Package registry :: Module publication
[frames] | no frames]

Source Code for Module gavo.registry.publication

  1  """ 
  2  "Publishing" service records -- grammar-type stuff and UI. 
  3   
  4  This module basically turns "publishable things" -- services, resource 
  5  records, data items -- into row dictionaries that can be entered into 
  6  the database. 
  7   
  8  This is one half of getting them into the registry.  The other half is 
  9  done in identifiers and builders; these take the stuff from the database, 
 10  rebuilds actual objects and creates registry records from them.  So, 
 11  the content of the service table is not actually used to build resource 
 12  records. 
 13  """ 
 14   
 15  #c Copyright 2008-2019, the GAVO project 
 16  #c 
 17  #c This program is free software, covered by the GNU GPL.  See the 
 18  #c COPYING file in the source distribution. 
 19   
 20   
 21  import datetime 
 22  import itertools 
 23  import os 
 24  import urlparse 
 25   
 26  import pkg_resources 
 27   
 28  from gavo import base 
 29  from gavo import grammars 
 30  from gavo import rsc 
 31  from gavo import rscdef 
 32  from gavo import utils 
 33   
 34  from gavo.registry import builders 
 35  from gavo.registry import common 
 36   
 37   
 38  # Names of renders that should not be shown to humans; these are 
 39  # also not included in the dc.interfaces table.  Right now, this 
 40  # list only contains the VOSI renderers, but other infrastructure-type 
 41  # capabilites might belong here, too. 
 42  HIDDEN_RENDERERS = frozenset([ 
 43          'tableMetadata', 'availability', 'capabilities']) 
44 45 46 @utils.memoized 47 -def getManagedAuthorities():
48 """returns a (cached) set of authorities our main registry 49 manages. 50 """ 51 reg = base.resolveCrossId("//services#registry") 52 return frozenset(v.getContent() 53 for v in reg.iterMeta("managedAuthority"))
54
55 56 -def makeBaseRecord(res, keepTimestamp=False):
57 """returns a dictionary giving the metadata common to resource records. 58 """ 59 # bomb out if critical metadata is missing 60 base.validateStructure(res) 61 # bomb out if, for some reason, we can't come up with a resource record 62 # for this guy 63 builders.getVOResourceElement(res) 64 # skip this resource if we must not publish it because we don't 65 # manage its authority 66 ivoid = base.getMetaText(res, "identifier") 67 auth = urlparse.urlparse(ivoid).netloc 68 if not auth in getManagedAuthorities(): 69 base.ui.notifyWarning("Skipping publication of resource" 70 " with identifier %s because we don't manage its authority"% 71 ivoid) 72 raise base.SkipThis("Resource from non-managed authority") 73 74 rec = {} 75 rec["ivoid"] = ivoid 76 rec["shortName"] = base.getMetaText(res, "shortName") 77 rec["sourceRD"] = res.rd.sourceId 78 rec["resId"] = res.id 79 rec["title"] = base.getMetaText(res, "title", propagate=True) 80 rec["deleted"] = False 81 rec["recTimestamp"] = datetime.datetime.utcnow() 82 rec["description"] = base.getMetaText(res, "description") 83 rec["authors"] = "; ".join(m.getContent("text") 84 for m in res.iterMeta("creator.name", propagate=True)) 85 dateUpdated = res.getMeta("datetimeUpdated") 86 87 if keepTimestamp: 88 try: 89 rec["recTimestamp"] = utils.parseISODT( 90 base.getMetaText(res, "recTimestamp")) 91 except base.NoMetaKey: 92 # not published, nothing to keep 93 pass 94 95 if dateUpdated is None: 96 rec["dateUpdated"] = datetime.datetime.utcnow() 97 else: 98 rec["dateUpdated"] = str(dateUpdated) 99 return rec
100
101 102 -def iterAuthorsAndSubjects(resource, sourceRD, resId):
103 """yields rows for the subjects and authors tables. 104 105 resource is the meta-carrier for the resource to be described, 106 sourceRD and resId are its keys in the resources table. 107 """ 108 for subject in [str(item) for item in resource.getMeta("subject") or (None,)]: 109 yield ("subjects", { 110 "sourceRD": sourceRD, 111 "resId": resId, 112 "subject": subject}) 113 114 # for authors, we support a special notation, separating individual 115 # authors with semicolons. 116 for authors in resource.iterMeta("creator.name", propagate="True"): 117 authors = [s.strip() for s in authors.getContent("text").split(";")] 118 for author in authors: 119 if not author.startswith("et al"): 120 yield ("authors", { 121 "sourceRD": sourceRD, 122 "resId": resId, 123 "author": author})
124
125 126 -def iterSvcRecs(service, keepTimestamp=False):
127 """iterates over records suitable for importing into the service list 128 for service. 129 """ 130 if not service.publications: 131 return # don't worry about missing meta if there are no publications 132 133 try: 134 rec = makeBaseRecord(service, keepTimestamp) 135 except base.SkipThis: 136 return 137 138 rec["owner"] = service.limitTo 139 yield ("resources", rec) 140 141 # each publication becomes one interface, except for auxiliary 142 # and VOSI publications, which are for the VO registry only. 143 for pub in service.publications: 144 if pub.auxiliary: 145 continue 146 if pub.render in HIDDEN_RENDERERS: 147 continue 148 149 try: 150 browseable = service.isBrowseableWith(pub.render) 151 except AttributeError: # service is not a ServiceBasedPage 152 browseable = False 153 154 pubService = service 155 if pub.service: 156 pubService = pub.service 157 158 intfRec = { 159 "sourceRD": rec["sourceRD"], 160 "resId": rec["resId"], 161 "renderer": pub.render, 162 "accessURL": pubService.getURL(pub.render, absolute=False), 163 "referenceURL": base.getMetaText(pubService, "referenceURL"), 164 "browseable": browseable, 165 "deleted": False} 166 yield ("interfaces", intfRec) 167 168 for setName in pub.sets: 169 intfRec.copy() 170 intfRec["setName"] = setName 171 yield ("sets", intfRec) 172 173 for pair in iterAuthorsAndSubjects(service, 174 rec["sourceRD"], rec["resId"]): 175 yield pair
176
177 178 -def iterResRecs(res, keepTimestamp=False):
179 """as iterSvcRecs, just for ResRecs rather than Services. 180 """ 181 try: 182 rec = makeBaseRecord(res, keepTimestamp) 183 except base.SkipThis: 184 return 185 186 # resource records only make sense if destined for the registry 187 rec["setName"] = "ivo_managed" 188 rec["renderer"] = "rcdisplay" 189 yield ("resources", rec) 190 yield ("sets", rec) 191 192 for pair in iterAuthorsAndSubjects(res, 193 rec["sourceRD"], rec["resId"]): 194 yield pair
195
196 197 -def iterDataRecs(res, keepTimestamp=False):
198 """as iterSvcRecs, just for DataDescriptors rather than Services. 199 """ 200 try: 201 rec = makeBaseRecord(res, keepTimestamp) 202 except base.SkipThis: 203 return 204 205 yield ("resources", rec) 206 for setName in res.registration.sets: 207 rec["setName"] = setName 208 rec["renderer"] = "rcdisplay" 209 yield ("sets", rec.copy()) 210 # if we have a local publication, add a fake interface with 211 # an accessURL pointing towards TAP 212 if setName=="local": 213 refURL = base.getMetaText(res, "referenceURL", macroPackage=res.rd) 214 yield ("interfaces", { 215 "sourceRD": rec["sourceRD"], 216 "resId": rec["resId"], 217 "renderer": "rcdisplay", 218 "accessURL": refURL+"?tapinfo=True" if refURL else None, 219 "referenceURL": refURL, 220 "browseable": False, 221 "deleted": False}) 222 223 for pair in iterAuthorsAndSubjects(res, 224 rec["sourceRD"], rec["resId"]): 225 yield pair
226
227 228 -class RDRscRecIterator(grammars.RowIterator):
229 """A RowIterator yielding resource records for inclusion into the 230 service list for the services defined in the source token RD. 231 """
232 - def _iterRows(self):
233 if self.grammar.unpublish: 234 return 235 236 for svc in self.sourceToken.services: 237 self.curSource = svc.id 238 for sr in iterSvcRecs(svc, self.grammar.keepTimestamp): 239 yield sr 240 241 for res in self.sourceToken.resRecs: 242 self.curSource = res.id 243 for sr in iterResRecs(res, self.grammar.keepTimestamp): 244 yield sr 245 246 for res in itertools.chain(self.sourceToken.tables, self.sourceToken.dds): 247 self.curSource = res.id 248 if res.registration: 249 for sr in iterDataRecs(res, self.grammar.keepTimestamp): 250 yield sr 251 252 # now see if there's a TAP-published table in there. If so, 253 # re-publish the TAP service, too, unless that table is published 254 # by itself (in which case we deem the metadata update on the 255 # table enough for now -- TODO: reconsider this when we allow 256 # TAP registration without an associated tableset) 257 if self.sourceToken.sourceId!="__system__/tap": 258 for table in self.sourceToken.tables: 259 if table.adql and not table.registration: 260 # we shortcut this and directly update the TAP timestamp 261 # since producing a TAP record can be fairly expensive. 262 with base.getWritableAdminConn() as conn: 263 conn.execute("UPDATE dc.resources SET" 264 " dateupdated=CURRENT_TIMESTAMP," 265 " rectimestamp=CURRENT_TIMESTAMP" 266 " WHERE sourcerd='__system__/tap'") 267 break
268
269 - def getLocation(self):
270 return "%s#%s"%(self.sourceToken.sourceId, self.curSource)
271
272 273 # extra handwork to deal with timestamps on deleted records 274 -def getDeletedIdentifiersUpdater(conn, rd):
275 """returns a function to be called after records have been 276 updated to mark new deleted identifiers as changed. 277 278 The problem solved here is that we mark all resource metadata 279 belonging to and RD as deleted before feeding the new stuff in. 280 We don't want to change resources.rectimestamp there; this would, 281 for instance, bump old deleted records. 282 283 What we can do is see if there's new deleted records after and rd 284 is through and update their rectimestamp. We don't need to like it. 285 """ 286 oldDeletedRecords = set(r[0] for r in 287 conn.query("select ivoid from dc.resources" 288 " where sourcerd=%(rdid)s and deleted", 289 {"rdid": rd.sourceId})) 290 291 def bumpNewDeletedRecords(): 292 newDeletedRecords = set(r[0] for r in 293 conn.query("select ivoid from dc.resources" 294 " where sourcerd=%(rdid)s and deleted", 295 {"rdid": rd.sourceId})) 296 toBump = newDeletedRecords-oldDeletedRecords 297 if toBump: 298 conn.execute("update dc.resources set rectimestamp=%(now)s" 299 " where ivoid in %(ivoids)s", { 300 "now": datetime.datetime.utcnow(), 301 "ivoids": toBump})
302 303 return bumpNewDeletedRecords 304
305 306 -class RDRscRecGrammar(grammars.Grammar):
307 """A grammar for "parsing" raw resource records from RDs. 308 """ 309 rowIterator = RDRscRecIterator 310 isDispatching = True 311 312 # this is a flag to try and keep the registry timestamps as they are 313 # during republication. 314 keepTimestamp = False 315 # setting the following to false will inhibit the re-creation of 316 # resources even if they're still defined in the RD. 317 unpublish = False
318
319 320 -def updateServiceList(rds, 321 metaToo=False, 322 connection=None, 323 onlyWarn=True, 324 keepTimestamp=False, 325 unpublish=False):
326 """updates the services defined in rds in the services table in the database. 327 328 This is what actually does the publication. 329 """ 330 recordsWritten = 0 331 parseOptions = rsc.getParseOptions(validateRows=True, batchSize=20) 332 if connection is None: 333 connection = base.getDBConnection("admin") 334 dd = common.getServicesRD().getById("tables") 335 dd.grammar = base.makeStruct(RDRscRecGrammar) 336 dd.grammar.keepTimestamp = keepTimestamp 337 dd.grammar.unpublish = unpublish 338 339 depDD = common.getServicesRD().getById("deptable") 340 msg = None 341 for rd in rds: 342 if rd.sourceId.startswith("/"): 343 raise base.Error("Resource descriptor ID must not be absolute, but" 344 " '%s' seems to be."%rd.sourceId) 345 346 deletedUpdater = getDeletedIdentifiersUpdater(connection, rd) 347 348 try: 349 data = rsc.makeData(dd, forceSource=rd, parseOptions=parseOptions, 350 connection=connection) 351 recordsWritten += data.nAffected 352 rsc.makeData(depDD, forceSource=rd, connection=connection) 353 354 if metaToo: 355 from gavo.protocols import tap 356 tap.unpublishFromTAP(rd, connection) 357 for dependentDD in rd: 358 rsc.Data.create(dependentDD, connection=connection).updateMeta() 359 tap.publishToTAP(rd, connection) 360 361 deletedUpdater() 362 363 except base.MetaValidationError as ex: 364 msg = ("Aborting publication of rd '%s' since meta structure of" 365 " %s (id='%s') is invalid:\n * %s")%( 366 rd.sourceId, repr(ex.carrier), ex.carrier.id, "\n * ".join(ex.failures)) 367 except base.NoMetaKey as ex: 368 msg = ("Aborting publication of '%s' at service '%s': Resource" 369 " record generation failed: %s"%( 370 rd.sourceId, ex.carrier.id, str(ex))) 371 except Exception as ex: 372 base.ui.notifyError("Fatal error while publishing from RD %s: %s"%( 373 rd.sourceId, str(ex))) 374 raise 375 376 if msg is not None: 377 if onlyWarn: 378 base.ui.notifyWarning(msg) 379 else: 380 raise base.ReportableError(msg) 381 msg = None 382 383 connection.commit() 384 return recordsWritten
385
386 387 -def _purgeFromServiceTables(rdId, conn):
388 """purges all resources coming from rdId from the registry tables. 389 390 This is not for user code that should rely on the tables doing the 391 right thing (e.g., setting the deleted flag rather than deleting rows). 392 Test code that is not in contact with the actual registry might want 393 this, though (until postgres grows nested transactions). 394 """ 395 cursor = conn.cursor() 396 for tableName in [ 397 "resources", "interfaces", "sets", "subjects", "res_dependencies", 398 "authors"]: 399 cursor.execute("delete from dc.%s where sourceRD=%%(rdId)s"%tableName, 400 {"rdId": rdId}) 401 cursor.close()
402
403 404 -def makeDeletedRecord(ivoid, conn):
405 """enters records into the internal service tables to mark ivoid 406 as deleted. 407 """ 408 fakeId = utils.getRandomString(20) 409 svcRD = base.caches.getRD("//services") 410 rscTable = rsc.TableForDef(svcRD.getById("resources"), 411 connection=conn) 412 rscTable.addRow({ 413 "sourceRD": "deleted", 414 "resId": fakeId, 415 "shortName": "deleted", 416 "title": "Anonymous Deleted Record", 417 "description": "This is a sentinel for a record once published" 418 " by this registry but now dropped.", 419 "owner": None, 420 "dateUpdated": datetime.datetime.utcnow(), 421 "recTimestamp": datetime.datetime.utcnow(), 422 "deleted": True, 423 "ivoid": ivoid, 424 "authors": ""}) 425 426 setTable = rsc.TableForDef(svcRD.getById("sets"), 427 connection=conn) 428 setTable.addRow({ 429 "sourceRD": "deleted", 430 "resId": fakeId, 431 "setName": "ivo_managed", 432 "renderer": "custom", 433 "deleted": True})
434
435 436 ################ UI stuff 437 438 -def findAllRDs():
439 """returns ids of all RDs (inputs and built-in) known to the system. 440 """ 441 rds = [] 442 inputsDir = base.getConfig("inputsDir") 443 for dir, dirs, files in os.walk(inputsDir): 444 445 if "DACHS_PRUNE" in files: 446 dirs = [] #noflake: deliberately manipulating loop variable 447 continue 448 449 for file in files: 450 if file.endswith(".rd"): 451 rds.append(os.path.splitext( 452 utils.getRelativePath(os.path.join(dir, file), inputsDir))[0]) 453 454 for name in pkg_resources.resource_listdir('gavo', 455 "resources/inputs/__system__"): 456 if not name.endswith(".rd"): # ignore VCS files (and possibly others:-) 457 continue 458 rds.append(os.path.splitext("__system__/%s"%name)[0]) 459 return rds
460
461 462 -def findPublishedRDs():
463 """returns the ids of all RDs which have been published before. 464 """ 465 with base.getTableConn() as conn: 466 return [r['sourcerd'] for r in conn.queryToDicts( 467 "select distinct sourcerd from dc.resources where not deleted")]
468
469 470 -def getRDs(args):
471 """returns a list of RDs from a list of RD ids or paths. 472 """ 473 from gavo import rscdesc 474 allRDs = [] 475 for rdPath in args: 476 try: 477 allRDs.append( 478 rscdef.getReferencedElement(rdPath, forceType=rscdesc.RD)) 479 except: 480 base.ui.notifyError("RD %s faulty, ignored.\n"%rdPath) 481 return allRDs
482
483 484 -def parseCommandLine():
485 import argparse 486 parser = argparse.ArgumentParser( 487 description="Publish services from an RD") 488 parser.add_argument("-a", "--all", 489 help="re-publish all RDs that have been published before (this" 490 " will cause a complete re-harvest of all your resources).", 491 dest="all", action="store_true") 492 parser.add_argument("-m", "--meta-too", 493 help="update meta information, too", 494 dest="meta", action="store_true") 495 parser.add_argument("-k", "--keep-timestamps", 496 help="Preserve the time stamp of the last record modification." 497 " This may sometimes be desirable with minor updates to an RD" 498 " that don't justify a re-publication to the VO.", 499 action="store_true", dest="keepTimestamp") 500 parser.add_argument("-u", "--unpublish", help="Unpublish all" 501 " resources coming from this RD", 502 dest="unpublish", action="store_true") 503 parser.add_argument("rd", type=str, nargs="+", 504 help="RD id(s) to publish. (This can also reference files, and" 505 " the extension .rd is automatically added)") 506 return parser.parse_args()
507
508 509 -def updateRegistryTimestamp():
510 """edits the dateupdated field for the registry service in servicelist. 511 """ 512 with base.AdhocQuerier(base.getAdminConn) as q: 513 regSrv = common.getRegistryService() 514 q.query("UPDATE services SET dateupdated=%(now)s" 515 " WHERE sourcerd=%(rdId)s AND resId=%(sId)s", { 516 "rdId": regSrv.rd.sourceId, 517 "sId": regSrv.id, 518 "now": datetime.datetime.utcnow(), 519 }) 520 common.getServicesRD().touchTimestamp()
521
522 523 -def main():
524 """handles the user interaction for gavo publish. 525 """ 526 from gavo import rscdesc #noflake: register cache 527 args = parseCommandLine() 528 common.getServicesRD().touchTimestamp() 529 if args.all: 530 args.rd = findPublishedRDs() 531 updateServiceList(getRDs(args.rd), 532 metaToo=args.meta, 533 keepTimestamp=args.keepTimestamp, 534 unpublish=args.unpublish) 535 base.tryRemoteReload("__system__/services")
536 537 538 if __name__=="__main__": 539 main() 540