Source code for gavo.formats.common

"""
Common code for generation of various data formats.

The main function here is formatData.  It receives a string format id, a data
instance and a destination file (binary mode).  It dispatches this to
formatters previously registered using registerDataWriter.

The data writers must take a data instance and a file instance; their
effect must be that a serialized representation of data, or, if the format
does not support this, the data's primary table is written to the file
instance.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import cgi
import io
import os
import mimetypes

from gavo import base
from gavo import utils


# used in guessMediaType
EXTENSION_FALLBACKS = {
	".vot": base.votableType,
	".fits": "application/fits",
	".fz": "image/fits",
}

# image/fits is probably not quite legal (it's not in Debian's
# /etc/mime.types), but it's too handy to pass up
mimetypes.add_type("image/fits", ".fits")


[docs]class CannotSerializeIn(base.Error): def __init__(self, format): self.format = format base.Error.__init__(self, format, hint="Either you gave an invalid format id or a known format" " did not get registered for some reason. Format codes" " known at this point: %s. You can also try common MIME types"%( ", ".join(FORMATS_REGISTRY.writerRegistry))) self.args = [format] def __str__(self): return "Cannot serialize in '%s'."%self.format
[docs]def getMIMEKey(contentType): """makes a DaCHS mime key from a content-type string. This is used for retrieving matching mime types and is a triple of major and minor mime type and a set of parameter pairs. contentType is a string-serialized mime type. We also normalise everything to lower case. I don't think that's quite standards-compliant, but with all the other case-insensitivity nonsense, anything else will get really ugly. """ contentType = contentType.lower() media_type, paramdict = cgi.parse_header(contentType) try: major, minor = media_type.split("/") except (ValueError, TypeError): raise CannotSerializeIn(contentType) return (major, minor, frozenset(iter(paramdict.items())))
[docs]class FORMATS_REGISTRY(object): """a registry for data formats that can be produced by DaCHS. This works by self-registration of the respective modules on their input; hence, if you want to rely on some entry here, be sure there's an import somewhere. """ # format key -> writer function writerRegistry = {} # format key -> mime type formatToMIME = {} # format key -> human-readable label formatToLabel = {} # (major, minor, param pair set) -> format key mimeToKey = {} extensionToKey = utils.CaseSemisensitiveDict() keyToExtension = {} # Formats TAPRegExt standard ids have an entry here keyToTAPId = {} # main format key to aliases also accepted keyToAliases = {}
[docs] @classmethod def registerDataWriter(cls, key, writer, mainMime, label, extension, *aliases, tapId=None): """adds a writer to the formats registry. Key is a short, unique handle for the format, writer is a writer function(data, outputFile) -> None (where data can be an rsc.Data or an rsc.Table instance), mainMime is the preferred media type, label is a human-readable designation for the format (shown in selection widgets and the like), extension is a suggested extension for the format (lower-case only), and aliases are other strings that can be used to select the format in DALI FORMAT or similar. Where keys, mainMime, and aliases clash, previous entries are silently overwritten. For extensions, the first registered format wins. """ cls.writerRegistry[key] = writer cls.formatToMIME[key] = mainMime cls.formatToLabel[key] = label cls.mimeToKey[getMIMEKey(mainMime)] = key for mime in aliases: cls.mimeToKey[getMIMEKey(mime)] = key if extension not in cls.extensionToKey: cls.extensionToKey[extension] = key cls.keyToExtension[key] = extension if tapId is not None: cls.keyToTAPId[key] = tapId cls.keyToAliases[key] = list(aliases) if mainMime!=key: cls.keyToAliases[key].append(key)
[docs] @classmethod def getMIMEFor(cls, formatName, orderedFormat=None): """returns a simple MIME type for our formatName (some incoming MIME or an alias). Some magic, reserved mimes that need to be preserved from the input are recognised and returned in orderedFormat. This is for TAP and related DALI hacks. """ # TAP Spec, 2.7.1, similar in DALI, wants us to keep some # media types. It's not quite clear which these actually are, # but I'd guess something like: if (orderedFormat and (orderedFormat.startswith("text/xml") or orderedFormat.startswith("application/x-votable+xml") or orderedFormat.startswith("text/plain"))): return orderedFormat if formatName in cls.formatToMIME: return cls.formatToMIME[formatName] # if it looks like a mime type, return it, otherwise assume it's # an unimported format and return a generic mime if "/" in formatName: return formatName else: return "application/octet-stream"
[docs] @classmethod def getWriterFor(cls, formatName): """returns a writer for formatName. writers are what's registered via registerDataWriter; formatName is a MIME type or a format alias. This raises CannotSerializeIn if no writer is available. """ return cls.writerRegistry[cls.getKeyFor(formatName)]
[docs] @classmethod def getLabelFor(cls, formatName): """returns a label for formatName (DaCHS key or MIME type). """ return cls.formatToLabel[cls.getKeyFor(formatName)]
[docs] @classmethod def getKeyFor(cls, formatName): """returns a DaCHS format key for formatName (DaCHS key or MIME). If formatName is a mime type with parameters, we'll also try to get a format with the parameters stripped and silently succeed if that works. """ formatName = formatName.lower() if formatName in cls.writerRegistry: return formatName parsed = getMIMEKey(formatName) if parsed in cls.mimeToKey: return cls.mimeToKey[parsed] parsed = (parsed[0], parsed[1], frozenset()) if parsed in cls.mimeToKey: return cls.mimeToKey[parsed] raise CannotSerializeIn(formatName)
[docs] @classmethod def getAliasesFor(cls, formatName): """returns alternate names for a DaCHS format key. Don't modify what you get back. This will return the DaCHS format key if it is not the mime itself. """ return cls.keyToAliases[formatName]
[docs] @classmethod def getTAPIdFor(cls, formatName): """returns a TAPRegExt ivoid for a DaCHS format key. This will return None if TAPRegExt does not prescribe such a key. """ return cls.keyToTAPId.get(formatName)
[docs] @classmethod def getTypeForExtension(cls, extension): """returns the media type first registered for extension. extension must begin with a dot. None is returned for extensions no format has (yet) claimed. """ key = cls.extensionToKey.get(extension.lower()) if key is None: return None return cls.formatToMIME[key]
[docs] @classmethod def iterFormats(cls): """iterates over the short names of the available formats. """ return iter(cls.writerRegistry)
registerDataWriter = FORMATS_REGISTRY.registerDataWriter getMIMEFor = FORMATS_REGISTRY.getMIMEFor getKeyFor = FORMATS_REGISTRY.getKeyFor getWriterFor = FORMATS_REGISTRY.getWriterFor getLabelFor = FORMATS_REGISTRY.getLabelFor getAliasesFor = FORMATS_REGISTRY.getAliasesFor getTAPIdFor = FORMATS_REGISTRY.getTAPIdFor iterFormats = FORMATS_REGISTRY.iterFormats
[docs]def formatData( formatName, table, outputFile, acquireSamples=True, **moreFormatterArgs): """writes a table to outputFile in the format given by key. Table may be a table or a ``Data`` instance. ``formatName`` is a format shortcut (``formats.iterFormats()`` gives keys available) or a media type. If you pass None, the default VOTable format will be selected. This raises a ``CannotSerializeIn`` exception if ``formatName`` is not recognized. Note that you have to import the serialising modules from the format package to make the formats available (fitstable, csvtable, geojson, jsontable, texttable, votable; api itself already imports the more popular of these). If a client knows a certain formatter understands additional arguments, it can hand them in as keywords arguments. This will raise an error if another formatter that doesn't understand the argument is being used. """ if formatName is None: formatName = base.votableType getWriterFor(formatName)( table, outputFile, acquireSamples=acquireSamples, **moreFormatterArgs)
[docs]def getFormatted(formatName, table, acquireSamples=False): """returns a string containing a representation of table in the format given by formatName. This is just wrapping the `function formatData`_; se there for formatName. This function will use large amounts of memory for large data. """ buffer = io.BytesIO() formatData(formatName, table, buffer, acquireSamples) return buffer.getvalue()
[docs]def guessMediaType(fName): """returns a media type plausible for a file named fName. This first uses the extension map inferred by our formats registry, has some built-in safety catches in case the formatters haven't been imported, and then falls back to built-in python mimetypes.guess_type If nothing matches, it returns application/octet-stream. Extensions are used case-insensitively. We don't do any encoding inference (yet). We may, though, so by all means shout if you're using this in DaCHS-external code. """ extension = os.path.splitext(fName)[-1].lower() res = FORMATS_REGISTRY.getTypeForExtension(extension) if res is None: res = EXTENSION_FALLBACKS.get(extension) if res is None: res, _ = mimetypes.guess_type(fName) if res is None: res = "application/octet-stream" return res
[docs]def getExtensionFor(mediaType): """returns a suggested extension for files of mediaType. mediaType can be an RFC 2045 media type, or one of DaCHS' internal format codes. As a fallback, .dat will be returned. """ try: return FORMATS_REGISTRY.keyToExtension[ FORMATS_REGISTRY.getKeyFor(mediaType)] except (CannotSerializeIn, KeyError): return mimetypes.guess_extension(mediaType) or ".dat"