Source code for gavo.web.producttar

"""
Helper functions for producing tar files from tables containing
a product column.

Everything in this module expects the product interface, i.e., tables
must at least contain accref, owner, embargo, and accsize fields.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


# XXX TODO: this should eventually become a renderer on the product core,
# redirected to from the current TarResponse.

import functools
import io
import os
import tarfile
import time

from gavo import base
from gavo import svcs
from gavo.protocols import products
from gavo.svcs import streaming


MS = base.makeStruct


[docs]class UniqueNameGenerator(object): """A factory to build unique names from possibly ambiguous ones. If the lower case of a name is not known to an instance, it just returns that name. Otherwise, it disambiguates by adding characters in front of the extension. """ def __init__(self): self.knownNames = set() def _buildNames(self, baseName): base, ext = os.path.splitext(baseName) yield "dc_data/%s%s"%(base, ext) i = 1 while True: yield "dc_data/%s-%03d%s"%(base, i, ext) i += 1
[docs] def makeName(self, baseName): for name in self._buildNames(baseName): if name.lower() not in self.knownNames: self.knownNames.add(name.lower()) return str(name)
[docs]class ProductTarMaker(object): """A factory for tar files. You probably don't want to instantiate it directly but instead get a copy through the getProductMaker function below. The main entry point to this class is deliverProductTar. """ def __init__(self): self.rd = base.caches.getRD("__system__/products") self.core = self.rd.getById("forTar") def _getEmbargoedFile(self, name): stuff = io.BytesIO(b"This file is embargoed. Sorry.\n") b = tarfile.TarInfo(name) b.size = len(stuff.getvalue()) b.mtime = time.time() return b, stuff def _getTarInfoFromProduct(self, prod, name): """returns a tar info from a general products.PlainProduct instance prod. This is relatively inefficient for data that's actually on disk, so you should only use it when data is being computed on the fly. """ assert not isinstance(prod, products.UnauthorizedProduct) data = b"".join(prod.iterData()) b = tarfile.TarInfo(name) b.size = len(data) b.mtime = time.time() return b, io.BytesIO(data) def _getDestName(self, productsTable): """returns a filename for a tar with the stuff in productsTable. For now, we just distinguish overflowed and non-overflowed tars. """ qs = base.getMetaText(productsTable, "_queryStatus") if qs=="OVERFLOW": return "truncated_data.tar" else: return "data.tar" def _productsToTar(self, productList, destination): """actually writes the tar. """ nameGen = UniqueNameGenerator() outputTar = tarfile.TarFile.open("data.tar", "w|", destination) for prodRec in productList: src = prodRec if isinstance(src, products.NonExistingProduct): continue # just skip files that somehow don't exist any more elif isinstance(src, products.UnauthorizedProduct): outputTar.addfile(*self._getEmbargoedFile(src.name)) elif isinstance(src, products.FileProduct): # actual file in the file system targetName = nameGen.makeName(src.name) outputTar.add(str(src.rAccref.localpath), targetName) else: # anything else is read from the src outputTar.addfile(*self._getTarInfoFromProduct(src, nameGen.makeName(src.name))) outputTar.close() return b"" # finish off request if necessary. def _streamOutTar(self, productData, request, queryMeta, destName): request.setHeader('content-disposition', 'attachment; filename=%s'%destName) request.setHeader("content-type", "application/x-tar") def writeTar(dest): self._productsToTar(productData, dest) return streaming.streamOut(writeTar, request, queryMeta)
[docs] def deliverProductTar(self, coreResult, request, queryMeta): """causes a tar containing all accrefs mentioned in coreResult to be streamed out via request. """ table = coreResult.getPrimaryTable() productColumns = products.getProductColumns(table.tableDef.columns) if not productColumns: raise base.ValidationError("This query does not select any" " columns with access references", "_OUTPUT") accrefs = [] for row in table: for col in productColumns: accrefs.append(row[col.name]) inputTable = svcs.CoreArgs.fromRawArgs( self.rd.getById("forTar").inputTable, {"accref": accrefs}) prods = self.core.run(self.rd.getById("getTar"), inputTable, queryMeta) return self._streamOutTar(prods, request, queryMeta, self._getDestName(table))
[docs]@functools.lru_cache(1) def getTarMaker(): return ProductTarMaker()