Package gavo :: Package rsc :: Module dumping
[frames] | no frames]

Source Code for Module gavo.rsc.dumping

  1  """ 
  2  DaCHS supports dump/restore operations on tables or set of tables. 
  3  This module implements the underlying file format and some utilities. 
  4   
  5  The file format itself is a tar.gz file with an index.txt consisting of lines 
  6   
  7  file_name table-id 
  8   
  9  followed by the files.  Table id is DaCHS's usual rd-id#table-id. 
 10  The files contain binary dump material. 
 11  """ 
 12   
 13  #c Copyright 2008-2019, the GAVO project 
 14  #c 
 15  #c This program is free software, covered by the GNU GPL.  See the 
 16  #c COPYING file in the source distribution. 
 17   
 18   
 19  import os 
 20  import sys 
 21  import time 
 22  import tarfile 
 23  from io import BytesIO 
 24   
 25  from gavo import base 
 26  from gavo import rscdef 
 27  from gavo import utils 
 28  from gavo.rsc import tables 
29 30 31 -def parseIndexFile(srcFile):
32 """parses our index.txt file format and returns (member-name, table-id) 33 tuples. 34 35 srcFile is an open file, probably from TarFile.extractfile. 36 37 If you change the index file format, you'll have to change this 38 and the corresponding code in createDump. 39 """ 40 for line in srcFile: 41 parts = line.strip().split() 42 assert len(parts)==2 43 yield parts
44
45 46 -def iterDbTables(objectId, connection):
47 """iterates over dbtable objects referenced by objectId. 48 49 objectId can reference a table def or an RD (in which case all 50 onDisk tables from it are returned). Or it can be a table already. 51 """ 52 if not isinstance(objectId, basestring): 53 # let's believe it's a database table 54 # (perhaps check that tableId.tableDef is good?) 55 yield objectId 56 57 else: 58 obj = base.resolveCrossId(objectId) 59 if isinstance(obj, rscdef.TableDef): 60 yield tables.TableForDef(obj, connection=connection) 61 62 elif hasattr(obj, "tables"): # let's assume it's an RD 63 for td in obj.tables: 64 if td.onDisk and not td.viewStatement: 65 yield tables.TableForDef(td, connection=connection) 66 67 else: 68 raise base.ReportableError("Can only dump by table or RD," 69 " but %s is neither"%objectId)
70
71 72 -def getTablesForIds(tableIds, connection):
73 """returns a list of validated dbtables of all tableIds. 74 75 This will raise an exception if any table id or database table doesn't 76 exist, or if the on-disk schema doesn't match the definition. 77 78 For convenience in internal use, tableIds that already are table 79 instances will not be touched. That's a bit tricky, though, because 80 you can have data from different transactions when you do that. 81 """ 82 dbtables = [] 83 for tableId in tableIds: 84 for table in iterDbTables(tableId, connection): 85 table.ensureOnDiskMatches() 86 dbtables.append(table) 87 return dbtables
88
89 90 -def createDump(tableIds, destFile):
91 """writes a DaCHS dump of tableIds to destFile. 92 93 tableIds is a list of rd-id#table-id identifiers (all must resolve), 94 destFile is a file object opened for writing. 95 """ 96 with base.getTableConn() as connection: 97 toDump = getTablesForIds(tableIds, connection) 98 destTar = tarfile.open(fileobj=destFile, mode="w:gz") 99 dumped = [] 100 101 for index, curTable in enumerate(toDump): 102 try: 103 dumpedBytes = BytesIO() 104 curTable.copyOut(dumpedBytes) 105 dumpedBytes.seek(0) 106 107 curInfo = tarfile.TarInfo("table_%03d.dump"%index) 108 curInfo.mtime = time.time() 109 curInfo.size = len(dumpedBytes.getvalue()) 110 destTar.addfile(curInfo, dumpedBytes) 111 112 dumped.append((curInfo.name, curTable.tableDef.getFullId())) 113 except Exception as msg: 114 base.ui.notifyError("Dumping %s failed: %s"%( 115 curTable.tableDef.getFullId(), 116 utils.safe_str(msg))) 117 118 indexText = "\n".join( 119 "%s %s"%d for d in dumped) 120 curInfo = tarfile.TarInfo("index.txt") 121 curInfo.mtime = time.time() 122 curInfo.size = len(indexText) 123 destTar.addfile(curInfo, BytesIO(str(indexText))) 124 125 destTar.close()
126
127 128 -def iterTableInfos(dumpFile):
129 """iterates over table info tuples from an open dump file. 130 131 Each tuple has the member name, the table id, a boolean whether 132 the table definion is accessible, the UTC unix time the dump 133 was made, and the size of the dump. 134 """ 135 tf = tarfile.open(fileobj=dumpFile, mode="r:gz") 136 for memberName, tableId in parseIndexFile(tf.extractfile("index.txt")): 137 memberInfo = tf.getmember(memberName) 138 tdExists = True 139 try: 140 base.resolveCrossId(tableId, forceType=rscdef.TableDef) 141 except base.NotFoundError: 142 tdExists = False 143 144 yield (memberName, tableId, tdExists, memberInfo.mtime, memberInfo.size)
145
146 147 -def restoreDump(dumpFile):
148 """restores a dump. 149 150 dumpFile is an open file object containing a file crated by createDump. 151 152 This comprises recrating all mentioned tables, copying in the associated 153 data, and re-creating all indices. 154 155 Each table is handled in a separate transaction, we do not stop if a single 156 restore has failed. 157 """ 158 toDo = list(iterTableInfos(dumpFile)) 159 dumpFile.seek(0) 160 tf = tarfile.open(fileobj=dumpFile, mode="r:gz") 161 162 with base.getWritableAdminConn() as connection: 163 for memberName, tdId, tdExists, _, _ in toDo: 164 if not tdExists: 165 base.ui.notifyWarning("Skipping restore of undefined table %s"%tdId) 166 continue 167 168 try: 169 table = tables.TableForDef( 170 base.resolveCrossId(tdId, forceType=rscdef.TableDef), 171 connection=connection) 172 table.recreate() 173 table.copyIn(tf.extractfile(memberName)) 174 table.makeIndices() 175 except Exception as msg: 176 table.connection.rollback() 177 base.ui.notifyError("Restore of %s failed: %s"%( 178 tdId, utils.safe_str(msg))) 179 else: 180 table.connection.commit()
181 182 183 ################# CLI functions 184 185 @utils.exposedFunction([ 186 utils.Arg("dumpFile", help="Name of a file to write the dump to; use - to" 187 " dump to stderr."), 188 utils.Arg("ids", help="ids of table definitions (as in myres/q#main)" 189 " or RDs to dump.", nargs="+")], 190 help="Dump one or more tables to DaCHS' dump format.")
191 -def create(args):
192 if args.dumpFile=="-": 193 dumpTo = sys.stdout 194 else: 195 dn = os.path.dirname(args.dumpFile) 196 if dn: 197 utils.ensureDir(dn) 198 dumpTo = open(args.dumpFile, "w") 199 200 try: 201 createDump(args.ids, dumpTo) 202 finally: 203 dumpTo.flush()
204 205 @utils.exposedFunction([ 206 utils.Arg("source", 207 help="File to restore from. Use - to restore from stdin.")], 208 help="Restore one or more table(s) from a file created by the create" 209 " subcommand before")
210 -def load(args):
211 if args.source=="-": 212 loadFrom = sys.stdin 213 else: 214 loadFrom = open(args.source) 215 restoreDump(loadFrom)
216 217 218 @utils.exposedFunction([ 219 utils.Arg("source", help="File to list")], 220 help="List tables and dump metadata from a DaCHS dump.")
221 -def ls(args):
222 import datetime 223 224 with open(args.source) as f: 225 data = [] 226 for _, tdId, exists, mtime, size in iterTableInfos(f): 227 data.append(( 228 tdId, 229 "probably" if exists else "no", 230 datetime.datetime.utcfromtimestamp(mtime).isoformat(), 231 size)) 232 sys.stdout.write(utils.formatSimpleTable( 233 data, 234 titles="table restorable? dumped size".split())+"\n")
235
236 237 -def main():
238 """does the user interaction. 239 """ 240 from gavo import rscdesc #noflake: for registration 241 args = utils.makeCLIParser(globals()).parse_args() 242 args.subAction(args)
243