Source code for gavo.user.mkrd

"""
Creation of resource descriptors

The first part of this is an early experiment to automatically create
resource descriptors from structured data representations.

While parts of this may be recoverable for smarter gavo start functionality,
doing this so that the result is actually useful is hard.

Instead, the new gavo start functionality just fetches one of a few
commented RD templates, fills out a thing or two and leaves the rest to
the operator.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import datetime
import os
import re
import sys

import pkg_resources

from gavo import base
from gavo import grammars
from gavo import rscdef
from gavo import votable
from gavo import utils
from gavo.base import macros
from gavo.grammars import fitsprodgrammar
from gavo.grammars import fitstablegrammar
from gavo.formats import votableread
from gavo.utils import ElementTree
from gavo.utils import fitstools

MS = base.makeStruct


FT_TYPE_MAP = {
	"L": "boolean",
	"B": "bytea",
	"A": "char",
	"I": "smallint",
	"J": "integer",
	"K": "bigint",
	"E": "real",
	"D": "double precision",
}


[docs]def getTypeForFTFormat(format, name): """returns a DaCHS type for FITS table column. """ typeCode = None # variable-length array FITS codes mat = re.match("(\d*)P(.)\(\d*\)$", format) if mat: typeCode = mat.group(2) arrSize = "100" # just a sentinel # "conventional FITS codes else: mat = re.match("(\d*)(.)$", format) if mat: arrSize, typeCode = mat.group(1), mat.group(2) if typeCode not in FT_TYPE_MAP: raise base.ReportableError(f"FITS type code '{format}' of {name}" " not handled by gavo mkrd; add handling if you can.") dbType = FT_TYPE_MAP[typeCode] if arrSize and int(arrSize)>1: if dbType=="char": dbType = "text" else: dbType = dbType+"[]" return dbType
# ======================= Begin deprecated ui and implementation =========== ignoredFITSHeaders = set(["COMMENT", "SIMPLE", "BITPIX", "EXTEND", "NEXTEND", "SOFTNAME", "SOFTVERS", "SOFTDATE", "SOFTAUTH", "SOFTINST", "HISTORY", "BZERO", "BSCALE", "DATAMIN", "DATAMAX"]) wcsKey = re.compile("CD.*|CRVAL.*|CDELT.*|NAXIS.*|CRPIX.*|CTYPE.*|CUNIT.*" "|CROTA.*|RADECSYS|AP?_\d_\d|BP?_\d_\d|LATPOLE|LONPOLE")
[docs]def isIgnoredKeyword(kw): """returns true if kw should not be translated or put into the table. This is important for all WCS keywords when you want to compute SIAP bboxes; these keywords must not be translated. """ return kw in ignoredFITSHeaders or wcsKey.match(kw)
[docs]def structToETree(aStruct): """returns an ElementTree for the copyable content of aStruct. Note that due to manipulations at parse time and non-copyable content, this will, in general, not reproduce the original XML trees. """ nodeStack = [ElementTree.Element(aStruct.name_)] for evType, elName, value in aStruct.iterEvents(): try: if evType=="start": nodeStack.append(ElementTree.SubElement(nodeStack[-1], elName)) elif evType=="end": nodeStack.pop() elif evType=="value": if value is None or value is base.NotGiven: continue if elName=="content_": nodeStack[-1].text = value else: if not isinstance(value, str): # TODO: figure out if something is a reference by inspecting # the attribute definition; meanwhile, just assume it is: value = value.id nodeStack[-1].set(elName, value) else: raise base.Error("Invalid struct event: %s"%evType) except: base.ui.notifyError("Badness occurred in element %s, event %s," " value %s\n"%(elName, evType, value)) raise return nodeStack[-1]
[docs]def makeTableFromFT(rd, srcName, opts): from gavo.utils import pyfits cols, nameMaps = [], {} nameMaker = base.VOTNameMaker() hdus = pyfits.open(srcName) for fitsCol in hdus[1].columns: destName = nameMaker.makeName(fitsCol) if destName!=fitsCol.name: nameMaps[destName] = fitsCol.name cols.append(MS(rscdef.Column, type=getTypeForFTFormat(fitsCol.format, fitsCol.name), name=destName, unit=fitsCol.unit, ucd="", description="FILL IN")) table = rscdef.TableDef(rd, id=opts.tableName, onDisk=True, columns=cols) table.nameMaps = nameMaps hdus.close() return table
[docs]def makeDataForFT(rd, srcName, opts): targetTable = rd.tables[0] # nameMaps left by makeTableFromFT rmkMaps =[MS(rscdef.MapRule, key=key, content_="vars['%s']"%src) for key, src in targetTable.nameMaps.items()] grammar = MS(fitstablegrammar.FITSTableGrammar) sources = MS(rscdef.SourceSpec, pattern=["*.fits"], recurse=True) rowmaker = MS(rscdef.RowmakerDef, id="gen_rmk", idmaps="*", maps=[rmkMaps]) make = MS(rscdef.Make, table=targetTable, rowmaker=rowmaker) return MS(rscdef.DataDescriptor, id="import", sources=sources, grammar=grammar, rowmakers=[rowmaker], make=make)
[docs]def makeTableFromFITS(rd, srcName, opts): keyMappings = [] table = rscdef.TableDef(rd, id=opts.tableName, onDisk=True) hdus = fitstools.openFits(srcName) headerCards = hdus[0].header.cards for index, card in enumerate(headerCards): if isIgnoredKeyword(card.keyword): continue colName = re.sub("[^a-z]", "_", card.keyword.lower()) if not colName: continue if isinstance(card.value, str): type = "text" elif isinstance(card.value, int): type = "integer" else: type = "real" table.feedObject("column", MS(rscdef.Column, name=colName, unit="FILLIN", ucd="FILLIN", type=type, description=card.comment)) keyMappings.append((colName, card.keyword)) rd.setProperty("mapKeys", ", ".join("%s:%s"%(v,k) for k,v in keyMappings)) hdus.close() return table.finishElement()
[docs]def makeDataForFITS(rd, srcName, opts): targetTable = rd.tables[0] dd = rscdef.DataDescriptor(rd, id="import_"+opts.tableName) grammar = fitsprodgrammar.FITSProdGrammar(dd) grammar.feedObject("qnd", True) rowfilter = base.parseFromString(grammars.Rowfilter, """ <rowfilter procDef="//products#define"> <bind key="table">"%s"</bind> <bind key="owner">"FILLIN"</bind> <bind key="embargo">"FILLIN"</bind> </rowfilter>"""%(targetTable.getQName())) grammar.feedObject("rowfilter", rowfilter) grammar.feedObject("mapKeys", MS(grammars.MapKeys, content_=rd.getProperty("mapKeys"))) grammar.finishElement() dd.grammar = grammar dd.feedObject("sources", MS(rscdef.SourceSpec, pattern=["*.fits"], recurse=True)) dd.feedObject("rowmaker", MS(rscdef.RowmakerDef, idmaps="*", id="gen_rmk")) dd.feedObject("make", MS(rscdef.Make, table=targetTable, rowmaker="gen_rmk")) return dd
[docs]def makeTableFromVOTable(rd, srcName, opts): with open(srcName, "rb") as f: rawTable = next(votable.parse(f)) return votableread.makeTableDefForVOTable(opts.tableName, rawTable.tableDefinition, onDisk=True)
[docs]def makeDataForVOTable(rd, srcName, opts): rowmaker = MS(rscdef.RowmakerDef, id="makerows_"+opts.tableName, idmaps="*") # The qualifiedId monkeying is necessary since otherwise the # ReferenceAttribute.unparse thinks it's ok to return the objects raw. # Face it: I've not really had serialization in mind when writing all # this. rowmaker.qualifiedId = rowmaker.id rd.tables[0].qualifiedId = rd.tables[0].id return MS(rscdef.DataDescriptor, grammar=MS(rscdef.getGrammar("voTableGrammar")), sources=MS(rscdef.SourceSpec, pattern=srcName), rowmaker=rowmaker, makes=[MS(rscdef.Make, table=rd.tables[0], rowmaker=rowmaker)])
tableMakers = { "FITS": makeTableFromFITS, "VOT": makeTableFromVOTable, "FT": makeTableFromFT, } dataMakers = { "FITS": makeDataForFITS, "VOT": makeDataForVOTable, "FT": makeDataForFT, }
[docs]def makeRD(args, opts): from gavo import rscdesc rd = rscdesc.RD(None, schema=os.path.basename(opts.resdir), resdir=opts.resdir) for key, value in [ ("title", "FILL-IN"), ("creationDate", utils.formatISODT(datetime.datetime.utcnow())), ("description", "FILL-IN a long text (and maybe do format='plain'" " or even format='rst'"), ("copyright", "Free to use."), ("creator.name", "Author, S."), ("creator", ""), ("creator.name", "Other, A."), ("subject", "One Keyword"), ("subject", "Two Words"), ("content.type", "Catalog"), ("coverage.waveband", "Optical"), ("coverage.profile", "AllSky ICRS"),]: rd.addMeta(key, value) rd.feedObject("table", tableMakers[opts.srcForm](rd, args[0], opts)) rd.feedObject("data", dataMakers[opts.srcForm](rd, args[0], opts)) return rd.finishElement()
[docs]def indent(elem, level=0): i = "\n" + level*"\t" if len(elem): if not elem.text or not elem.text.strip(): elem.text = i + "\t" if not elem.tail or not elem.tail.strip(): elem.tail = i for child in elem: indent(child, level+1) if not child.tail or not child.tail.strip(): child.tail = i if not elem.tail or not elem.tail.strip(): elem.tail = i else: if level and (not elem.tail or not elem.tail.strip()): elem.tail = i
[docs]def writePrettyPrintedXML(root): indent(root) ElementTree.ElementTree(root).write(sys.stdout.buffer, encoding="utf-8")
[docs]def parseCommandLine(): from optparse import OptionParser parser = OptionParser(usage = "%prog [options] <sample>" " DEPRECATED, use dachs start instead") parser.add_option("-f", "--format", help="Input file format: " " FITS, VOT or FT (FITS table)" " Default: Detect from file name", dest="srcForm", default=None, action="store", type="str") parser.add_option("-t", "--table-name", help="Name of the generated table", dest="tableName", default="main", action="store", type="str") parser.add_option("-r", "--resdir", help="Override resdir (and schema)", dest="resdir", default=os.getcwd(), action="store", type="str") opts, args = parser.parse_args() if len(args)!=1: parser.print_help(file=sys.stderr) sys.exit(1) if not opts.srcForm: ext = os.path.splitext(args[0])[1].lower() if ext in set([".xml", ".vot"]): opts.srcForm = "VOT" elif ext==".fits": opts.srcForm = "FITS" else: sys.stderr.write("Cannot guess format, use -f option: %s\n"%args[0]) parser.print_help(file=sys.stderr) sys.exit(1) return opts, args
[docs]def main(): # hack to make id and onDisk copyable so we see them on iterEvent rscdef.TableDef._id.copyable = rscdef.TableDef._onDisk.copyable = True rscdef.DataDescriptor._id.copyable = True opts, args = parseCommandLine() rd = makeRD(args, opts) rd._metaAttr.copyable = True eTree = structToETree(rd) writePrettyPrintedXML(eTree)
# ======================= End deprecated ui and implementation ===========
[docs]def iterColAttrsFITS(table): """yields dicts of column attributes from a FITS binary table HDU. """ hdr = table.header for colInd in range(1, hdr["TFIELDS"]+1): colMeta = [ ("name", re.sub("[^0-9a-z_]", "_", hdr[f"TTYPE{colInd}"].lower())), ("type", getTypeForFTFormat( hdr[f"TFORM{colInd}"], hdr[f"TTYPE{colInd}"])), ("ucd", hdr.get(f"TUCD{colInd}", None)), ("utype", hdr.get(f"TUTYP{colInd}", None)), ("unit", hdr.get(f"TUNIT{colInd}", None)), ("description", hdr.get(f"TCOMM{colInd}", None ) or hdr.cards[f"TTYPE{colInd}"].comment or ""), ("verbLevel", 1),] yield dict((k,v) for k,v in colMeta if v is not None)
[docs]def iterColAttrsVOTable(inFile): """tries to parse (binary) inFile as a VOTable and yields colAttrs dictionaries for all FIELDs in it. """ for el in votable.parse(inFile, watchset=[votable.V.FIELD]): if isinstance(el, votable.V.FIELD): colMeta = [ ("name", el.name), ("unit", el.unit), ("ucd", el.ucd or ""), ("utype", el.utype), ("type", base.voTableToSQLType(el.datatype, el.arraysize, el.xtype)), ("description", el.getDescription()), ("verbLevel", 1),] yield dict((k,v) for k,v in colMeta if v is not None)
[docs]class VizBBBParser(object): """A simple and naive parser for VizieR Byte-by-Byte descriptions. The results are available in the colAttrs and bytesForCols attributes. """ bbbLinePat = re.compile( r"\s*(?P<from>\d+)\s*-\s*(?P<to>\d+)" r"\s+(?P<format>[A-Z0-9.]+)" r"\s+(?P<units>[^\s]*)" r"\s+(?P<label>[^\s]*)" r"\s+(?P<description>.*)") tableBorderPat = re.compile("--------+") def __init__(self): self.state = "scanning" self.infoSoFar = {} self.colAttrs = [] self.bytesForCols = [] def _parse_finished(self, ln): """we've found and parsed the BBB table. """ pass def _parse_scanning(self, ln): """we're waiting for the table header of the first BBB block. """ if ln.startswith("Byte-by-byte Description"): self.state = "skip_header" def _parse_skip_header(self, ln): """we're skipping over the table header of the first BBB block. """ mat = self.bbbLinePat.match(ln) if mat: self.state = "bbb_colhead" self._parse_bbb_colhead(ln, mat) elif self.tableBorderPat.match(ln) or re.match("\s*Bytes", ln): pass else: raise Exception(f"Expected BBB table, got '{ln}'") def _parse_bbb_colhead(self, ln, mat=None): """we're expecting either a BBB column or the end of the table. """ if mat is None: mat = self.bbbLinePat.match(ln) if not mat: if self.tableBorderPat.match(ln): self.state = "finished" else: raise Exception("Expected BBB column def, got '{ln}'") self.infoSoFar = mat.groupdict() self.state = "bbb_moreexpl" def _parse_bbb_moreexpl(self, ln): """we're expecting either a BBB column, a continuation line for the explanation, or the end of the table. In this state, there's non-shipped column data in infoSoFar. """ mat = self.bbbLinePat.match(ln) if mat: self._shipout() self.state = "bbb_colhead" self._parse_bbb_colhead(ln, mat) elif self.tableBorderPat.match(ln): self._shipout() self.state = "finished" else: self.infoSoFar["description"] += (" "+ln.strip()) def _getTypeForTypeCode(self, typeCode): """returns one of our SQL types for a VizieR BBB type code. """ baseType = typeCode[0] if baseType=="A": return "text" elif baseType=="I": return { "I1": "smallint", "I2": "smallint", "I4": "integer"}.get(typeCode, "bigint") elif baseType in "FE": if int(typeCode[1:].split(".")[0])>7: return "double precision" else: return "real" def _shipout(self): """takes infoSoFar and makes bytesForCols and colAttrs entries from it. """ unit = self.infoSoFar["units"] rec = [ ("name", self.infoSoFar["label"]), ("unit", None if unit=="---" else unit), ("ucd", ""), ("verbLevel", "1"), ("type", self._getTypeForTypeCode(self.infoSoFar["format"])), ("description", self.infoSoFar["description"].strip())] self.colAttrs.append(dict((k,v) for k,v in rec if v is not None)) self.bytesForCols.append("{label}: {from}-{to}".format(**self.infoSoFar))
[docs] def feed(self, line): getattr(self, "_parse_"+self.state)(line)
[docs]def iterColAttrsViz(inFile, dumpRanges=False): """tries to parse (text) inFile as a VizieR-Style bytes-by-bytes description and yields colAttrs for the first described table. dumpRanges is for interactive use, where the function additionally dumps a colDefs content to stdout. """ parser = VizBBBParser() for ln in inFile: parser.feed(ln) if parser.state!="finished": base.ui.notifyWarning("Probably invalid VizieR byte-by-byte: parser" f" ended up in {parser.state}") if dumpRanges: print("\n".join(parser.bytesForCols)+"\n") return parser.colAttrs
COLUMN_ELEMENT_TEMPLATES = ['<column', ' name="{name}"', ' type="{type}"', '\n ', 'unit="{unit}" ', 'ucd="{ucd}"', '\n utype="{utype}"', '\n tablehead="{tablehead}"', '\n description="{description}"', '\n displayHint="{displayHint}"', '\n verbLevel="{verbLevel}"', "/>"]
[docs]def formatColumnElement(colAttrs): """returns a column element XML literal for colAttrs in Markus' preferred formatting. """ accum = [] for template in COLUMN_ELEMENT_TEMPLATES: try: accum.append(template.format(**colAttrs)) except KeyError: # skip this template element as there's no value for it pass return "".join(accum)
[docs]def getColumnXML(colAttrs): """returns formatted XML for a sequence for column attributes. colAttrs is a sequence of dicts with fillers into COLUMN_ELEMENT_TEMPLATES; the must already be escaped for use in XML attributes. """ accum = [] for fillers in colAttrs: accum.append(formatColumnElement(fillers)) return "\n".join(accum)
[docs]def parseGenColCommandLine(): import argparse parser = argparse.ArgumentParser(description="Write column definitions" " for representing a table") parser.add_argument("table", metavar="FILE", action="store", type=str, help="Source file to generate the columns from." " This can be a FITS, a VOTable, or Vizier Column-by-column" " description, and DaCHS will guess based on the extension. Use" " the format option if it guesses wrong") parser.add_argument("-f", "--format", dest="format", action="store", choices=["fits", "vot", "viz"], help="Format of the input table: FITS binary, VOTable, or Vizier" " byte-by-byte.") return parser.parse_args()
[docs]def gencol(): """a UI function generating columns element from FITS binary tables. """ from gavo.utils import pyfits args = parseGenColCommandLine() if args.table.endswith(".fits") or args.format=="fits": try: table = pyfits.open(args.table)[1] colIterator = iterColAttrsFITS(table) except Exception as exc: base.ui.notifyError(f"Cannot open {args.table} as a FITS binary table: " +str(exc)) sys.exit(1) elif (args.table.endswith(".vot") or args.table.endswith(".xml") or args.format=="vot"): try: with open(args.table, "rb") as f: colIterator = list(iterColAttrsVOTable(f)) except Exception as exc: base.ui.notifyError(f"Cannot open {args.table} as a VOTable: " +str(exc)) elif (args.table.endswith(".txt") or args.table=="README" or args.format=="viz"): try: with open(args.table, "rb") as f: colIterator = iterColAttrsViz(f, True) except Exception as exc: base.ui.notifyError(f"Cannot open {args.table} as a VizieR" " Byte-by-Byte file: "+str(exc)) else: base.ui.notifyError(f"Do not know how to read {args.table}. Please" " use --format") sys.exit(1) print(utils.fixIndentation( getColumnXML( colIterator), " "))
# ====================== Begin templating ========================= def _listProtocols(): """writes to stdout a list of protocols we can generate RD templates for. This is read from the resources coming with DaCHS, where we evaluate a magic =tpldesc Desc= string that must be embedded in the template. """ templateDir ="resources/src/" for fName in pkg_resources.resource_listdir( "gavo", templateDir): mat = re.match(r"template-(.*)\.rd_$", fName) if not mat: continue protName = mat.group(1) with pkg_resources.resource_stream('gavo', templateDir+fName) as f: tplHead = f.read(1000).decode("utf-8") mat = re.search(r"\\tpldesc{(.*?)}", tplHead) if not mat: continue protDesc = mat.group(1) print("%s -- %s"%(protName, protDesc))
[docs]def parseStartCommandLine(): import argparse parser = argparse.ArgumentParser(description="Write a template" " q.rd for a certain data type") parser.add_argument("protocol", metavar="PROTO", action="store", type=str, help="Generate an RD template for PROTO; use list to" " see what is available.") args = parser.parse_args() if args.protocol=="list": _listProtocols() sys.exit(0) return args
[docs]class TemplateMacroPackage(macros.MacroPackage): """Macros for RD templates. """
[docs] def macro_tpldesc(self, description): """A silent macro used for self-documentation. """ return ""
[docs] def macro_now(self): """returns an ISO representation of just about now UTC. """ return utils.formatISODT(datetime.datetime.utcnow())
[docs] def macro_resdir(self): """returns the last element of the current path. This is assumed to be the intended resource directory. """ return os.path.split(os.getcwd())[-1]
[docs] def macro_commonmeta(self): """expands to the content of tpart-commonmeta.xml. """ with pkg_resources.resource_stream('gavo', "resources/src/tpart-commonmeta.xml") as f: return f.read().decode("utf-8")
[docs]class MkrdMacroProcessor(macros.MacroExpander): """a macro expander for RD templates. """ def __init__(self): macros.MacroExpander.__init__(self, TemplateMacroPackage())
[docs]def start(): args = parseStartCommandLine() outputName = "q.rd" try: source = pkg_resources.resource_stream('gavo', "resources/src/template-%s.rd_"%args.protocol) except IOError: base.ui.notifyError("No template for %s."%args.protocol) sys.exit(1) if os.path.exists(outputName): base.ui.notifyError( "Output %s already exists. Move it away and try again."%outputName) sys.exit(1) else: proc = MkrdMacroProcessor() rdSource = proc.expand(source.read().decode("utf-8")) with open(outputName, "wb") as dest: dest.write(rdSource.encode("utf-8")) source.close()