Source code for gavo.rsc.dumping

"""
DaCHS supports dump/restore operations on tables or set of tables.
This module implements the underlying file format and some utilities.

The file format itself is a tar.gz file with an index.txt consisting of lines

file_name table-id

followed by the files.  Table id is DaCHS's usual rd-id#table-id.
The files contain binary dump material.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import os
import sys
import time
import tarfile
from io import BytesIO

from gavo import base
from gavo import rscdef
from gavo import utils
from gavo.rsc import tables


[docs]def parseIndexFile(srcFile): """parses our index.txt file format and returns (member-name, table-id) tuples. srcFile is an open binary file, probably from TarFile.extractfile. If you change the index file format, you'll have to change this and the corresponding code in createDump. """ for line in srcFile: parts = line.decode("utf-8").strip().split() assert len(parts)==2 yield parts
[docs]def iterDbTables(objectId, connection): """iterates over dbtable objects referenced by objectId. objectId can reference a table def or an RD (in which case all onDisk tables from it are returned). Or it can be a table already. """ if not isinstance(objectId, str): # let's believe it's a database table # (perhaps check that tableId.tableDef is good?) yield objectId else: obj = base.resolveCrossId(objectId) if isinstance(obj, rscdef.TableDef): yield tables.TableForDef(obj, connection=connection) elif hasattr(obj, "tables"): # let's assume it's an RD for td in obj.tables: if td.onDisk and not td.viewStatement: yield tables.TableForDef(td, connection=connection) else: raise base.ReportableError("Can only dump by table or RD," " but %s is neither"%objectId)
[docs]def getTablesForIds(tableIds, connection): """returns a list of validated dbtables of all tableIds. This will raise an exception if any table id or database table doesn't exist, or if the on-disk schema doesn't match the definition. For convenience in internal use, tableIds that already are table instances will not be touched. That's a bit tricky, though, because you can have data from different transactions when you do that. """ dbtables = [] for tableId in tableIds: for table in iterDbTables(tableId, connection): table.ensureOnDiskMatches() dbtables.append(table) return dbtables
[docs]def createDump(tableIds, destFile, binary=True): """writes a DaCHS dump of tableIds to destFile. tableIds is a list of rd-id#table-id identifiers (all must resolve), destFile is a file object opened for writing. """ with base.getTableConn() as connection: toDump = getTablesForIds(tableIds, connection) destTar = tarfile.open(fileobj=destFile, mode="w:gz") dumped = [] if binary: namePattern = "table_%03d.dump" else: namePattern = "table_%03d.textdump" for index, curTable in enumerate(toDump): try: dumpedBytes = BytesIO() curTable.copyOut(dumpedBytes, binary=binary) dumpedBytes.seek(0) curInfo = tarfile.TarInfo(namePattern%index) curInfo.mtime = time.time() curInfo.size = len(dumpedBytes.getvalue()) destTar.addfile(curInfo, dumpedBytes) dumped.append((curInfo.name, curTable.tableDef.getFullId())) except Exception as msg: base.ui.notifyError("Dumping %s failed: %s"%( curTable.tableDef.getFullId(), utils.safe_str(msg))) indexText = ("\n".join( "%s %s"%d for d in dumped)).encode("utf-8") curInfo = tarfile.TarInfo("index.txt") curInfo.mtime = time.time() curInfo.size = len(indexText) destTar.addfile(curInfo, BytesIO(indexText)) destTar.close()
[docs]def iterTableInfos(dumpFile): """iterates over table info tuples from an open dump file. Each tuple has the member name, the table id, a boolean whether the table definion is accessible, the UTC unix time the dump was made, and the size of the dump. """ tf = tarfile.open(fileobj=dumpFile, mode="r:gz") for memberName, tableId in parseIndexFile(tf.extractfile("index.txt")): memberInfo = tf.getmember(memberName) tdExists = True try: base.resolveCrossId(tableId, forceType=rscdef.TableDef) except base.NotFoundError: tdExists = False yield (memberName, tableId, tdExists, memberInfo.mtime, memberInfo.size)
[docs]def restoreDump(dumpFile): """restores a dump. dumpFile is an open file object containing a file created by createDump. This comprises recrating all mentioned tables, copying in the associated data, and re-creating all indices. Each table is handled in a separate transaction, we do not stop if a single restore has failed. """ toDo = list(iterTableInfos(dumpFile)) dumpFile.seek(0) tf = tarfile.open(fileobj=dumpFile, mode="r:gz") with base.getWritableAdminConn() as connection: for memberName, tdId, tdExists, _, _ in toDo: if not tdExists: base.ui.notifyWarning("Skipping restore of undefined table %s"%tdId) continue try: table = tables.TableForDef( base.resolveCrossId(tdId, forceType=rscdef.TableDef), connection=connection) table.recreate() table.copyIn(tf.extractfile(memberName), binary=memberName.endswith(".dump")) table.makeIndices() except Exception as msg: table.connection.rollback() base.ui.notifyError("Restore of %s failed: %s"%( tdId, utils.safe_str(msg))) else: table.connection.commit()
################# CLI functions
[docs]@utils.exposedFunction([ utils.Arg("-t", "--text", help="Dump text rather than binary", dest="textFormat", action="store_true"), utils.Arg("dumpFile", help="Name of a file to write the dump to; use - to" " dump to stderr."), utils.Arg("ids", help="ids of table definitions (as in myres/q#main)" " or RDs to dump.", nargs="+")], help="Dump one or more tables to DaCHS' dump format.") def create(args): if args.dumpFile=="-": dumpTo = sys.stdout else: dn = os.path.dirname(args.dumpFile) if dn: utils.ensureDir(dn) dumpTo = open(args.dumpFile, "wb") try: createDump(args.ids, dumpTo, not args.textFormat) finally: dumpTo.flush() if dumpTo!=sys.stdout: dumpTo.close()
[docs]@utils.exposedFunction([ utils.Arg("source", help="File to restore from. Use - to restore from stdin.")], help="Restore one or more table(s) from a file created by the create" " subcommand before") def load(args): if args.source=="-": loadFrom = sys.stdin else: loadFrom = open(args.source, "rb") restoreDump(loadFrom) if loadFrom!=sys.stdin: loadFrom.close()
[docs]@utils.exposedFunction([ utils.Arg("source", help="File to list")], help="List tables and dump metadata from a DaCHS dump.") def ls(args): import datetime with open(args.source, "rb") as f: data = [] for _, tdId, exists, mtime, size in iterTableInfos(f): data.append(( tdId, "probably" if exists else "no", datetime.datetime.utcfromtimestamp(mtime).isoformat(), size)) sys.stdout.write(utils.formatSimpleTable( data, titles="table restorable? dumped size".split())+"\n")
[docs]def main(): """does the user interaction. """ from gavo import rscdesc #noflake: for registration args = utils.makeCLIParser(globals()).parse_args() args.subAction(args)