Source code for gavo.user.validation

"""
A cli-facing module providing functionality to "validate" one or more
resource descriptors.

Validation means giving some prognosis as to whether RD will properly work
within both the DC and the VO.

While validation is active there's base.VALIDATING=True.  If RDs
to anything expensive, they're advised to have something like::

	if getattr(base, "VALIDATING", False):
		(don't do the expensive thing)
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import argparse
import itertools
import re
import sys
import traceback


from gavo import api
from gavo import adql
from gavo import base
from gavo import svcs
from gavo import stc
from gavo import registry
from gavo import utils
from gavo.helpers import testtricks
from gavo.imp import astropyucd
from gavo.registry import builders
from gavo.protocols import datalink
from gavo.protocols import vocabularies
from gavo.user import errhandle
from gavo.web import htmltable

from gavo.web import examplesrender #noflake: for RST registration

builders.VALIDATING = True

# have some non-UAT keywords that we swallow for some reason or other
NON_UAT_SUBJECTS = {"stars", "DOI"}

# just accept some UCDs mentioned in standards; some of
# these have actually been made legal later, but we don't want to
# report them even with old astropys
UCD_WHITELIST = {'instr.fov', "em.line"}


[docs]class TestsCollector(object): """a singleton that collects use cases to run. Don't instantiate, this is a global singleton. The testsToRun attribute contains the test suites to run. """ testsToRun = []
[docs] @classmethod def addRD(cls, rd): """adds tests from rd. """ for suite in rd.tests: cls.testsToRun.append(suite)
[docs]def outputDependentMessage(aString): """an output function for errhandle.raiseAndCatch. It is used here to indent dependent error messages. """ print(re.sub("(?m)^", " ", aString))
[docs]def outputError(rdId, message, verbose=False): print("[ERROR] %s: %s"%(rdId, message)) if verbose: errhandle.raiseAndCatch(output=outputDependentMessage)
[docs]def outputWarning(rdId, message, verbose=False): print("[WARNING] %s: %s"%(rdId, message)) if verbose: errhandle.raiseAndCatch(output=outputDependentMessage)
[docs]def loadRD(rdId): """returns the RD identified by rdId. If that fails, diagnostics are printed and None is returned. """ try: rd = api.getReferencedElement(rdId, doQueries=False) # This is so we can validate userconfig.rd if hasattr(rd, "getRealRD"): rd = rd.getRealRD() except api.RDNotFound: outputError(rdId, "RD or dependency not found, message follows", True) except api.LiteralParseError: outputError(rdId, "Bad literal in RD, message follows", True) except api.StructureError: outputError(rdId, "Malformed RD input, message follows", True) except api.Error: outputError(rdId, "Syntax or internal error, message follows", True) else: return rd # Fallthrough: RD could not be loaded return None
_XSD_VALIDATOR = testtricks.XSDTestMixin()
[docs]def isIVOPublished(res): """returns true if res has a publication facing the VO. """ if hasattr(res, "registration"): # it's a data item return "ivo_managed" in res.registration.sets else: # it's a service: for pub in res.publications: if "ivo_managed" in pub.sets: return True else: return False
[docs]def iterPublishedResources(rd, args): for svc in rd.services: if args.prePublication or svc.publications: yield svc for res in rd.resRecs: yield res for res in itertools.chain(rd.tables, rd.dds): if res.registration: yield res
[docs]def validateServices(rd, args): """outputs to stdout various diagnostics about the services on rd. """ validSoFar = True for res in iterPublishedResources(rd, args): try: base.validateStructure(res) except api.MetaValidationError as ex: validSoFar = False outputError(rd.sourceId, "Missing metadata for publication of" " service %s:\n%s"%(res.id, str(ex))) continue # further checks will just add verbosity # see if subject keywords are from UAT if isIVOPublished(res): uat = vocabularies.get_vocabulary("uat") for subject in res.iterMeta("subject", propagate=True): if (str(subject) not in uat["terms"] and str(subject) not in NON_UAT_SUBJECTS): outputWarning(rd.sourceId, "Service %s has subject %s" ", which is not from http://ivoa.net/rdf/uat."%( res.id, subject)) if isinstance(res, svcs.Service): if not (args.prePublication or isIVOPublished(res)): # require sane metadata on services only if the VO will see the service continue # error out if the identifier cannot be generated api.getMetaText(res, "identifier") registryRecord = None try: registryRecord = builders.getVORMetadataElement(res) except stc.STCSParseError as msg: validSoFar = False outputError(rd.sourceId, "Invalid STC-S (probably in coverage meta)" ": %s"%str(msg)) except: validSoFar = False outputError(rd.sourceId, "Error when producing registry record" " of service %s:"%res.id, True) # make sure the registry record is XSD-valid if registryRecord is not None: try: _XSD_VALIDATOR.assertValidates( registryRecord.render(), leaveOffending=True) except AssertionError as msg: validSoFar = False outputError(rd.sourceId, "Invalid registry record for service" " %s:\n%s"%(res.id, str(msg))) return validSoFar
[docs]def validateRST(rd, args): """outputs diagnostics on RST formatting problems. """ def validateRSTOne(el): validSoFar = True for key, val in getattr(el, "getAllMetaPairs", lambda: [])(): if val.format=='rst': content = val.getExpandedContent(macroPackage=el) _, msg = utils.rstxToHTMLWithWarning(content) if msg: outputWarning(rd.sourceId, "%s metadata on %s (%s) has an RST problem: %s"%( key, el, utils.makeEllipsis(content, 80), msg)) for child in el.iterChildren(): if child: validSoFar = validSoFar and validateRSTOne(child) return validSoFar return validateRSTOne(rd)
[docs]def validateRowmakers(rd, args): """tries to build all rowmakers mentioned in the RD and bails out if one is bad. """ for dd in rd: for m in dd.makes: m.table.onDisk = False try: api.TableForDef(m.table) m.rowmaker.compileForTableDef(m.table) finally: m.table.onDisk = True return True
[docs]def validateOtherCode(rd, args): """tries to compile other pieces of code in an RD and bails out if one is bad. """ retval = True for suite in rd.tests: for test in suite.tests: try: test.compile() except Exception as msg: outputError(rd.sourceId, "Bad test '%s': %s"%(test.title, msg)) retval = False for svc in rd.services: for outputField in svc.getCurOutputFields(): if outputField.formatter: try: htmltable._compileRenderer(outputField.formatter, None, rd) except Exception as msg: outputError(rd.sourceId, "Bad formatter on output field '%s': %s"%( outputField.name, msg)) retval = False if isinstance(svc.core, datalink.DatalinkCore): try: if "dlmeta" in svc.allowed: svc.core.descriptorGenerator.compile(svc.core) if "dlget" in svc.allowed: for df in svc.core.dataFunctions: df.compile(svc.core) svc.core.dataFormatter.compile(svc.core) except Exception as msg: outputError(rd.sourceId, "Bad datalink function in service '%s': %s"%( svc.id, msg)) if isinstance(msg, base.BadCode): outputError(rd.sourceId, "Bad code:\n%s"%msg.code) retval = False for job in rd.jobs: try: job.job.compile(parent=rd) except Exception as msg: outputError(rd.sourceId, "Bad code in job '%s': %s"%( job.title, msg)) retval = False # TODO: iterate over service/cores and standalone cores and # fiddle out condDescs # TODO: Iterate over scripts and data/make/scripts, see which # are python and try to compile them # TODO: Iterate over grammars and validate rowfilters return retval
[docs]def validateTables(rd, args): """does some sanity checks on the (top-level) tables within rd. """ valid = True identifierSymbol = adql.getSymbols()["identifier"] for td in rd.tables: curTableName = td.getQName() try: base.validateStructure(td) except api.MetaValidationError as ex: valid = False outputError(rd.sourceId, "Missing metadata in" " table %s:\n%s"%(td.id, str(ex))) continue # further checks will just add verbosity for col in td: try: if col.unit: parsedUnit = api.parseUnit(col.unit) if not parsedUnit.isValid: outputWarning(rd.sourceId, f"Column {curTableName}.{col.name}: Unit {col.unit} is" " not interoperable (in VOUnit's list of recognised units)") except api.BadUnit: valid = False outputError(rd.sourceId, "Bad unit in table %s, column %s: %s"%( curTableName, col.name, repr(col.unit))) if col.ucd and not col.ucd in UCD_WHITELIST: try: astropyucd.parse_ucd(col.ucd, check_controlled_vocabulary=True, has_colon=True) except ValueError as msg: outputWarning(rd.sourceId, f"Column {curTableName}.{col.name}: UCD {col.ucd}" f" not accepted by astropy ({msg}).") try: identifierSymbol.parseString(str(col.name), parseAll=True) except base.ParseException: outputWarning(rd.sourceId, "Column %s.%s: Name is not a regular" " ADQL identifier."%(td.id, col.name)) if td.onDisk and args.compareDB: with base.getTableConn() as conn: q = base.UnmanagedQuerier(conn) if q.getTableType(curTableName) is not None: t = api.TableForDef(td, connection=conn) try: t.ensureOnDiskMatches() except api.DataError as msg: valid = False outputError(rd.sourceId, utils.makeEllipsis(utils.safe_str(msg), 160)) # associated datalink services and the columns must exist. for dldef in td.iterMeta("_associatedDatalinkService"): try: _ = base.resolveId(td.rd, base.getMetaText(dldef, "serviceId")) _ = td.getColumnByName(base.getMetaText(dldef, "idColumn")) _ except (base.NotFoundError, base.MetaError) as msg: valid = False outputError(rd.sourceId, utils.makeEllipsis(utils.safe_str(msg), 160)) if td.registration: registryRecord = None try: registryRecord = builders.getVORMetadataElement(td) except Exception as msg: valid = False outputError(rd.sourceId, "Table publication of %s could not be built: %s"%( td.id, str(msg))) if registryRecord is not None: try: _XSD_VALIDATOR.assertValidates( registryRecord.render(), leaveOffending=True) except AssertionError as msg: valid = False outputError(rd.sourceId, "Invalid registry record for table" " %s:\n%s"%(td.id, str(msg))) return valid
[docs]def validateOne(rdId, args): """outputs to stdout various information on the RD identified by rdId. """ with testtricks.collectedEvents("Info", "Warning") as warnings: rd = loadRD(rdId) if rd is None: return for warning in warnings: outputWarning(rdId, warning[1]) if args.runTests: TestsCollector.addRD(rd) validSoFar = validateServices(rd, args) validSoFar = validSoFar and validateRowmakers(rd, args) validSoFar = validSoFar and validateTables(rd, args) validSoFar = validSoFar and validateOtherCode(rd, args) validSoFar = validSoFar and validateRST(rd, args) return validSoFar
[docs]def validateAll(args): """validates all accessible RDs. """ if args.rd[0]=="ALL": rdSource = registry.findPublishedRDs() else: rdSource = registry.findAllRDs() for rdId in rdSource: if args.verbose: sys.stdout.write(rdId+" ") sys.stdout.flush() try: validateOne(rdId, args) except Exception: sys.stderr.write("Severe error while validating %s:\n"%rdId) traceback.print_exc() if args.verbose: sys.stdout.write("\n")
[docs]def parseCommandLine(): parser = argparse.ArgumentParser(description="Check RDs for well-formedness" " and some aspects of VO-friendlyness") parser.add_argument("rd", nargs="+", type=str, help="RD identifier or file system path. Use magic value ALL to" " check all published RDs, ALL_RECURSE to look for RDs in the file" " system.") parser.add_argument("-p", "--pre-publication", help="Validate" " as if all services were IVOA published even if they are not" " (this may produce spurious errors if unpublished services are in" " the RD).", action="store_true", dest="prePublication") parser.add_argument("-v", "--verbose", help="Talk while working", action="store_true", dest="verbose") parser.add_argument("-t", "--run-tests", help="Run regression tests" " embedded in the checked RDs", action="store_true", dest="runTests") parser.add_argument("-T", "--timeout", help="When running tests, abort" " and fail requests after inactivity of SECONDS", action="store", dest="timeout", type=int, default=15, metavar="SECONDS") parser.add_argument("-c", "--compare-db", help="Also make sure that" " tables that are on disk (somewhat) match the definition in the RD.", action="store_true", dest="compareDB") return parser.parse_args()
[docs]def main(): base.VALIDATING = True args = parseCommandLine() if len(args.rd)==1 and args.rd[0] in ("ALL", "ALL_RECURSE"): validateAll(args) else: for rd in args.rd: print(rd, "--", end=' ') sys.stdout.flush() if validateOne(rd, args): print("OK") else: print("Fail") if args.runTests: print("\nRunning regression tests\n") from gavo.rscdef import regtest runner = regtest.TestRunner(TestsCollector.testsToRun, verbose=False, timeout=args.timeout) runner.runTests(showDots=True) print(runner.stats.getReport()) if runner.stats.fails: print("\nThe following tests failed:\n") print(runner.stats.getFailures())