Source code for gavo.protocols.datapack

"""
Dumping our resources to frictionless data packages (henceforce: datapack)
and loading from them again.

Specifications:

* https://specs.frictionlessdata.io/data-package/
* https://specs.frictionlessdata.io/data-resource/

DaCHS-generated RDs can be recognised by the presence of
a dachs-rd-id key in the global metadata.  Also, we will always
write the RD as the first resource; for good measure, we also mark
it by having a dachs-resource-descriptor name.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import glob
import json
import os
import shutil
import subprocess
import zipfile
from typing import Callable, Generator

from gavo import base
from gavo import rscdef
from gavo import rscdesc
from gavo import utils


def _perhaps(d:dict, key:str, val:str) -> None:
	if val:
		d[key] = val


[docs]def makeBasicMeta(rd:rscdesc.RD) -> dict: """returns a basic, resource-less, datapack descriptor from an RD. """ res = { "dachs-resdir": utils.getRelativePath(rd.resdir, base.getConfig("inputsDir")), "name": rd.schema, "homepage": base.makeAbsoluteURL("/browse/"+rd.sourceId), # TODO: We'd like to have an custom profile for DaCHS. We'll # need to put in a json schema for that, though. Look into it. "profile": "data-package", } _perhaps(res, "id", base.getMetaText(rd, "doi", acceptSequence=True)) licenses = [] for item in rd.iterMeta("rights"): try: uriMeta = base.getMetaText(item, "rightsURI") if uriMeta: licenses.append( {"path": str(uriMeta)}) except base.NoMetaKey: pass _perhaps(res, "licenses", licenses) contributors = [] for item in rd.iterMeta("creator"): contributors.append({ "title": base.getMetaText(item, "name", "Unknown"), "role": "author" }) _perhaps(res, "contributors", contributors) subjects = [] for item in rd.iterMeta("subject"): subjects.append(str(item)) _perhaps(res, "keywords", subjects) for ourKW, theirKW in [ ("title", None), ("description", None), ("version", None), ("creationDate", "created")]: _perhaps(res, theirKW or ourKW, base.getMetaText(rd, ourKW, acceptSequence=True)) return res
[docs]def namer(template:str) -> Callable[[], int]: i = 0 while True: yield template%i i += 1
[docs]def iterExtraResources(rd:rscdesc.RD, cleanPath:Callable[[str], str] ) -> Generator[dict, None, None]: """yields datapack resources from the datapack-extrafiles property. This is a json sequence, and files are only returned if they exist. Directories are ignored. """ makeName = namer("extrafile-%04d") pathLiteral = rd.getProperty("datapack-extrafiles", None) if pathLiteral is None: return patterns = json.loads(pathLiteral) for pattern in patterns: for match in glob.glob(rd.getAbsPath(pattern)): if os.path.isfile(match): yield { "name": next(makeName), "profile": "data-resource", "path": cleanPath(match) }
[docs]def iterRDResources(rd:rscdesc.RD) -> Generator[dict, None, None]: """yields datapack resource descriptions for the RD and all ancillary files we can discover. All path names here are relative to the RD. Anything that is not in the RD will not be exported (without serious trickery, that is). """ resdir = os.path.normpath(rd.resdir) def cleanPath(path): return utils.getRelativePath(path, resdir, liberalChars=True) # the RD itself yield { "name": "dachs-resource-descriptor", "path": cleanPath(rd.srcPath), "profile": "data-resource", "title": "The DaCHS resource descriptor", "mediatype": "text/xml", } # a README, if it's there if os.path.exists(rd.getAbsPath("README")): yield { "name": "readme", "path": "README", "profile": "data-resource", "mediatype": "text/plain", } # Manually added files for res in iterExtraResources(rd, cleanPath): yield res # extra instrumentation in getDRForDump creates the filesLoaded # attribute, which we use to add ancillary files used by the RD # TODO: catch a few more and add them in getRDForDump makeName = namer("rdaux-%02d") # filesLoaded are uniquefied because, e.g., multiple data # elements might use the same custom grammar. for extra in sorted(set(rd.filesLoaded)): fullPath = os.path.join(rd.resdir, extra) # hack: external python modules drop the .py in the attribute value if not os.path.exists(fullPath) and os.path.exists(fullPath+".py"): extra = extra+".py" yield { "name": next(makeName), "profile": "data-resource", "path": extra, } # files imported makeName = namer("data-%05d") for dd in rd.dds: if dd.sources and dd.sources.ignoredSources.ignoresVaryingStuff(): base.ui.notifyWarning( "data %s#%s ignored because of dynamic ignoreSources"%( rd.sourceId, dd.id)) continue for source in dd.iterSources(): if isinstance(source, str) and os.path.exists(source): yield { "name": next(makeName), "profile": "data-resource", "path": cleanPath(source)} else: # it's probably some artificial source token; don't even # bother to report anything looks weird. pass
[docs]def makeDescriptor(rd:rscdesc.RD) -> dict: """returns a datapack descriptor in a python dictionary. """ desc = makeBasicMeta(rd) desc["resources"] = list(iterRDResources(rd)) return desc
[docs]def getRDForDump(rdId:str) -> rscdesc.RD: """loads an RD for later dumping. The main thing this does is instrument ResdirRelativeAttribute (and possibly later other things) to record what ancillary data the RD has loaded. This is, of course, not thread-safe or anything, and it could collect false positives when RDs reference or include other RDs. Only use it while making datapacks. """ origParse = rscdef.ResdirRelativeAttribute.parse filesLoaded = [] def record(self, val): filesLoaded.append(val) return origParse(self, val) try: rscdef.ResdirRelativeAttribute.parse = record rd = rscdesc.getRD(rdId) rd.filesLoaded = filesLoaded finally: rscdef.ResdirRelativeAttribute.parse = origParse return rd
[docs]def dumpPackage(rdId:str, destFile) -> None: """write a zip of the complete data package for a resource descriptor to destFile. destFile an be anything that zip.ZipFile accepts in w mode. """ with zipfile.ZipFile(destFile, "w") as dest: rd = getRDForDump(rdId) descriptor = makeDescriptor(rd) dest.writestr("datapackage.json", json.dumps(descriptor)) for rsc in descriptor["resources"]: dest.write( os.path.join(rd.resdir, rsc["path"]), rsc["path"])
[docs]def getPackageMeta(packageName:str) -> dict: """returns a dict of DaCHS-specific metadata items from a DaCHS-produced data package. """ res = {} try: with zipfile.ZipFile(packageName, "r") as archive: with archive.open("datapackage.json") as f: packageMeta = json.load(f) if "dachs-resdir" not in packageMeta: raise ValueError("Need dachs-resdir key in package meta") res["resdir"] = packageMeta["dachs-resdir"] try: res0 = packageMeta["resources"][0] except (IndexError, KeyError): raise ValueError("Data package without resources") if res0["name"]!="dachs-resource-descriptor": raise ValueError("First data package resource isn't a DaCHS RD") res["rdpath"] = res0["path"] except Exception as ex: raise base.ui.logOldExc( base.ReportableError("%s is not a data package produced by DaCHS" " (or, if it was, DaCHS is broken and you should" " complain)"%packageName, hint="Here is what failed in the background: %s"%ex)) return res
[docs]@utils.exposedFunction([ utils.Arg("id", help="Absolute RD id to produce a datapack for"), utils.Arg("dest", help="Name of a zip file to dump to")], help="Produce a frictionless data package for the RD and the data" " it imports.") def create(args:list): with open(args.dest, "wb") as dest: dumpPackage(args.id, dest)
[docs]@utils.exposedFunction([ utils.Arg("source", help="Name of a DaCHS-produced data package zip file."), utils.Arg("-t", "--no-test", help="Do not run tests after importing.", dest="suppressTests", action="store_true"), utils.Arg("--force", help="If the resdir the package declares already" " exists, remove it before unpacking (rather than bailing out).", dest="removeResdir", action="store_true"),], help="Load and import a data package. This only works for data packages" " actually produced by DaCHS.") def load(args:list): packageMeta = getPackageMeta(args.source) absSource = os.path.abspath(args.source) os.chdir(base.getConfig("inputsDir")) if os.path.exists(packageMeta["resdir"]): if args.removeResdir: shutil.rmtree(packageMeta["resdir"]) else: raise base.ReportableError( "Refusing to overwrite directory '%s'"%packageMeta["resdir"], hint="The data package is for a resource living in this resource" " directory, and it can only run there. However, that directory" " already exists. Move it away and try again.") base.makeSharedDir(packageMeta["resdir"]) os.chdir(packageMeta["resdir"]) subprocess.run(["unzip", absSource], check=True) subprocess.run(["dachs", "imp", packageMeta["rdpath"]], check=True) if not args.suppressTests: subprocess.run(["dachs", "test", "-v", packageMeta["rdpath"]])
[docs]def main(): """does the cli interaction. """ args = utils.makeCLIParser(globals()).parse_args() args.subAction(args)