1 """
2 DaCHS supports dump/restore operations on tables or set of tables.
3 This module implements the underlying file format and some utilities.
4
5 The file format itself is a tar.gz file with an index.txt consisting of lines
6
7 file_name table-id
8
9 followed by the files. Table id is DaCHS's usual rd-id#table-id.
10 The files contain binary dump material.
11 """
12
13
14
15
16
17
18
19 import os
20 import sys
21 import time
22 import tarfile
23 from io import BytesIO
24
25 from gavo import base
26 from gavo import rscdef
27 from gavo import utils
28 from gavo.rsc import tables
32 """parses our index.txt file format and returns (member-name, table-id)
33 tuples.
34
35 srcFile is an open file, probably from TarFile.extractfile.
36
37 If you change the index file format, you'll have to change this
38 and the corresponding code in createDump.
39 """
40 for line in srcFile:
41 parts = line.strip().split()
42 assert len(parts)==2
43 yield parts
44
47 """iterates over dbtable objects referenced by objectId.
48
49 objectId can reference a table def or an RD (in which case all
50 onDisk tables from it are returned). Or it can be a table already.
51 """
52 if not isinstance(objectId, basestring):
53
54
55 yield objectId
56
57 else:
58 obj = base.resolveCrossId(objectId)
59 if isinstance(obj, rscdef.TableDef):
60 yield tables.TableForDef(obj, connection=connection)
61
62 elif hasattr(obj, "tables"):
63 for td in obj.tables:
64 if td.onDisk and not td.viewStatement:
65 yield tables.TableForDef(td, connection=connection)
66
67 else:
68 raise base.ReportableError("Can only dump by table or RD,"
69 " but %s is neither"%objectId)
70
73 """returns a list of validated dbtables of all tableIds.
74
75 This will raise an exception if any table id or database table doesn't
76 exist, or if the on-disk schema doesn't match the definition.
77
78 For convenience in internal use, tableIds that already are table
79 instances will not be touched. That's a bit tricky, though, because
80 you can have data from different transactions when you do that.
81 """
82 dbtables = []
83 for tableId in tableIds:
84 for table in iterDbTables(tableId, connection):
85 table.ensureOnDiskMatches()
86 dbtables.append(table)
87 return dbtables
88
91 """writes a DaCHS dump of tableIds to destFile.
92
93 tableIds is a list of rd-id#table-id identifiers (all must resolve),
94 destFile is a file object opened for writing.
95 """
96 with base.getTableConn() as connection:
97 toDump = getTablesForIds(tableIds, connection)
98 destTar = tarfile.open(fileobj=destFile, mode="w:gz")
99 dumped = []
100
101 for index, curTable in enumerate(toDump):
102 try:
103 dumpedBytes = BytesIO()
104 curTable.copyOut(dumpedBytes)
105 dumpedBytes.seek(0)
106
107 curInfo = tarfile.TarInfo("table_%03d.dump"%index)
108 curInfo.mtime = time.time()
109 curInfo.size = len(dumpedBytes.getvalue())
110 destTar.addfile(curInfo, dumpedBytes)
111
112 dumped.append((curInfo.name, curTable.tableDef.getFullId()))
113 except Exception as msg:
114 base.ui.notifyError("Dumping %s failed: %s"%(
115 curTable.tableDef.getFullId(),
116 utils.safe_str(msg)))
117
118 indexText = "\n".join(
119 "%s %s"%d for d in dumped)
120 curInfo = tarfile.TarInfo("index.txt")
121 curInfo.mtime = time.time()
122 curInfo.size = len(indexText)
123 destTar.addfile(curInfo, BytesIO(str(indexText)))
124
125 destTar.close()
126
129 """iterates over table info tuples from an open dump file.
130
131 Each tuple has the member name, the table id, a boolean whether
132 the table definion is accessible, the UTC unix time the dump
133 was made, and the size of the dump.
134 """
135 tf = tarfile.open(fileobj=dumpFile, mode="r:gz")
136 for memberName, tableId in parseIndexFile(tf.extractfile("index.txt")):
137 memberInfo = tf.getmember(memberName)
138 tdExists = True
139 try:
140 base.resolveCrossId(tableId, forceType=rscdef.TableDef)
141 except base.NotFoundError:
142 tdExists = False
143
144 yield (memberName, tableId, tdExists, memberInfo.mtime, memberInfo.size)
145
148 """restores a dump.
149
150 dumpFile is an open file object containing a file crated by createDump.
151
152 This comprises recrating all mentioned tables, copying in the associated
153 data, and re-creating all indices.
154
155 Each table is handled in a separate transaction, we do not stop if a single
156 restore has failed.
157 """
158 toDo = list(iterTableInfos(dumpFile))
159 dumpFile.seek(0)
160 tf = tarfile.open(fileobj=dumpFile, mode="r:gz")
161
162 with base.getWritableAdminConn() as connection:
163 for memberName, tdId, tdExists, _, _ in toDo:
164 if not tdExists:
165 base.ui.notifyWarning("Skipping restore of undefined table %s"%tdId)
166 continue
167
168 try:
169 table = tables.TableForDef(
170 base.resolveCrossId(tdId, forceType=rscdef.TableDef),
171 connection=connection)
172 table.recreate()
173 table.copyIn(tf.extractfile(memberName))
174 table.makeIndices()
175 except Exception as msg:
176 table.connection.rollback()
177 base.ui.notifyError("Restore of %s failed: %s"%(
178 tdId, utils.safe_str(msg)))
179 else:
180 table.connection.commit()
181
182
183
184
185 @utils.exposedFunction([
186 utils.Arg("dumpFile", help="Name of a file to write the dump to; use - to"
187 " dump to stderr."),
188 utils.Arg("ids", help="ids of table definitions (as in myres/q#main)"
189 " or RDs to dump.", nargs="+")],
190 help="Dump one or more tables to DaCHS' dump format.")
192 if args.dumpFile=="-":
193 dumpTo = sys.stdout
194 else:
195 dn = os.path.dirname(args.dumpFile)
196 if dn:
197 utils.ensureDir(dn)
198 dumpTo = open(args.dumpFile, "w")
199
200 try:
201 createDump(args.ids, dumpTo)
202 finally:
203 dumpTo.flush()
204
205 @utils.exposedFunction([
206 utils.Arg("source",
207 help="File to restore from. Use - to restore from stdin.")],
208 help="Restore one or more table(s) from a file created by the create"
209 " subcommand before")
216
217
218 @utils.exposedFunction([
219 utils.Arg("source", help="File to list")],
220 help="List tables and dump metadata from a DaCHS dump.")
235
238 """does the user interaction.
239 """
240 from gavo import rscdesc
241 args = utils.makeCLIParser(globals()).parse_args()
242 args.subAction(args)
243