1 """
2 An abstract processor and some helping code.
3
4 Currently, I assume a plain text interface for those. It might be
5 a good idea to use the event mechanism here.
6 """
7
8
9
10
11
12
13
14 from __future__ import print_function
15
16 from cStringIO import StringIO
17 import os
18 import sys
19 import textwrap
20 import traceback
21
22 import matplotlib
23
24
25 matplotlib.use("Agg")
26
27 from PIL import Image
28
29 from gavo import base
30 from gavo import rsc
31 from gavo import utils
32 from gavo.helpers import anet
33 from gavo.helpers import fitstricks
34 from gavo.utils import fitstools
35 from gavo.utils import pyfits
36
37 pyplot = utils.DeferredImport("pyplot", "from matplotlib import pyplot")
41 """is raised when no FITS header was generated by a HeaderProcessor.
42
43 Specifically, this is what gets raised when ``_getHeader`` returns ``None``.
44 """
45
48 """An abstract base for a source file processor.
49
50 In concrete classes, you need to define a ``process(fName)`` method
51 receiving a source as returned by the dd (i.e., usually a file name).
52
53 You can override the method ``_createAuxiliaries(dataDesc)`` to compute
54 things like source catalogues, etc. Thus, you should not need to
55 override the constructor.
56
57 These objects are usually constructed thorough ``api.procmain`` as
58 discussed in :dachsdoc:`processing.html`.
59 """
60 inputsDir = base.getConfig("inputsDir")
61
63 self.opts, self.dd = opts, dd
64 self._createAuxiliaries(dd)
65
67
68
69 if hasattr(self, "_createAuxillaries"):
70 self._createAuxillaries(dd)
71
74
77
81
90
92 print("\n\nProcessor Report\n================\n")
93 if ignored:
94 print("Warning: There were %d errors during classification"%ignored)
95 repData = sorted(zip(map(len, self.reportDict.values()),
96 self.reportDict.keys()))
97 print(utils.formatSimpleTable(repData))
98 print("\n")
99 self.printTableSize()
100
102 print("\n\nProcessor Report\n================\n")
103 if ignored:
104 print("Warning: There were %d errors during classification"%ignored)
105 repData = sorted(zip(self.reportDict.values(),
106 self.reportDict.keys()),
107 key=lambda v: -len(v[0]))
108 print("\n%s\n%s\n"%(repData[0][1], "-"*len(repData[0][1])))
109 print("%d items\n"%(len(repData[0][0])))
110 for items, label in repData[1:]:
111 print("\n%s\n%s\n"%(label, "-"*len(label)))
112 items.sort()
113 print("%d items:\n"%(len(items)))
114 print("\n".join(textwrap.wrap(", ".join(items))))
115 print("\n")
116 self.printTableSize()
117
118 @staticmethod
120 parser.add_option("--filter", dest="requireFrag", metavar="STR",
121 help="Only process files with names containing STR", default=None)
122 parser.add_option("--bail", help="Bail out on a processor error,"
123 " dumping a traceback", action="store_true", dest="bailOnError",
124 default=False)
125 parser.add_option("--report", help="Output a report only",
126 action="store_true", dest="doReport", default=False)
127 parser.add_option("--verbose", help="Be more talkative",
128 action="store_true", dest="beVerbose", default=False)
129 parser.add_option("--n-procs", "-j", help="Run NUM processes in"
130 " parallel", action="store", dest="nParallel", default=1,
131 metavar="NUM", type=int)
132
133 _doneSentinel = ("MAGIC: QUEUE DONE",)
134
136 """executes process() in parallel for all sources and iterates
137 over the results.
138
139 We use this rather than multiprocessing's Pool, as that cannot
140 call methods. I'm working around this here.
141 """
142 import multiprocessing
143
144 taskQueue = multiprocessing.Queue(nParallel*4)
145 doneQueue = multiprocessing.Queue()
146
147 def worker(inQueue, outQueue):
148 if hasattr(self, "conn"):
149 self.conn = base.getDBConnection("trustedquery")
150 for srcId in iter(inQueue.get, None):
151 if (self.opts.requireFrag is not None
152 and not self.opts.requireFrag in srcId):
153 continue
154
155 try:
156 outQueue.put(self.process(srcId))
157 except base.SkipThis:
158 continue
159 except Exception as ex:
160 ex.source = srcId
161 if self.opts.bailOnError:
162 sys.stderr.write("*** %s\n"%srcId)
163 traceback.print_exc()
164 outQueue.put(ex)
165 outQueue.put(self._doneSentinel)
166
167
168 activeWorkers = 0
169
170
171 if hasattr(self, "conn"):
172 self.conn.close()
173 for i in range(nParallel):
174 multiprocessing.Process(target=worker,
175 args=(taskQueue, doneQueue)).start()
176 activeWorkers += 1
177 if hasattr(self, "conn"):
178 self.conn = base.getDBConnection("trustedquery")
179
180
181 toDo = self.iterIdentifiers()
182 while True:
183 try:
184 taskQueue.put(toDo.next())
185 except StopIteration:
186 break
187
188 while not doneQueue.empty():
189 yield doneQueue.get()
190
191
192 for i in range(nParallel):
193 taskQueue.put(None)
194 taskQueue.close()
195 while activeWorkers:
196 item = doneQueue.get()
197 if item==self._doneSentinel:
198 activeWorkers -= 1
199 else:
200 yield item
201
203 """calls procFunc for all sources in self.dd.
204
205 This is the default, single-tasking implementation.
206 """
207 processed, ignored = 0, 0
208
209 if nParallel==1:
210 def iterProcResults():
211 for source in self.iterIdentifiers():
212 if (self.opts.requireFrag is not None
213 and not self.opts.requireFrag in source):
214 continue
215
216 try:
217 yield procFunc(source)
218 except base.SkipThis:
219 continue
220 except Exception as ex:
221 ex.source = source
222 if self.opts.bailOnError:
223 sys.stderr.write("*** %s\n"%source)
224 traceback.print_exc()
225 yield ex
226 resIter = iterProcResults()
227 else:
228 resIter = self.iterJobs(nParallel)
229
230 while True:
231 try:
232 res = resIter.next()
233 if isinstance(res, Exception):
234 raise res
235 except StopIteration:
236 break
237 except KeyboardInterrupt:
238 sys.exit(2)
239 except Exception as msg:
240 if self.opts.bailOnError:
241 sys.exit(1)
242 sys.stderr.write("Skipping source %s: (%s, %s)\n"%(
243 getattr(msg, "source", "(unknown)"), msg.__class__.__name__,
244 repr(msg)))
245 ignored += 1
246 processed += 1
247 sys.stdout.write("%6d (-%5d)\r"%(processed, ignored))
248 sys.stdout.flush()
249 return processed, ignored
250
252 """iterates over all identifiers that should be processed.
253
254 This is usually the paths of the files to be processed.
255 You can, however, override it to do something else if that
256 fits your problem (example: Previews in SSA use the accref).
257 """
258 return iter(self.dd.sources)
259
261 """calls the process method of processor for all sources of the data
262 descriptor dd.
263 """
264 if self.opts.doReport:
265 self.reportDict = {}
266 procFunc = self.addClassification
267 else:
268 procFunc = self.process
269 processed, ignored = self._runProcessor(procFunc,
270 nParallel=self.opts.nParallel)
271 if self.opts.doReport:
272 if self.opts.beVerbose:
273 self.printVerboseReport(processed, ignored)
274 else:
275 self.printReport(processed, ignored)
276 return processed, ignored
277
278
279
282
316
319 """A base for processors doing FITS header manipulations.
320
321 The processor builds naked FITS headers alongside the actual files, with an
322 added extension .hdr (or whatever is in the headerExt attribute). The
323 presence of a FITS header indicates that a file has been processed. The
324 headers on the actual FITS files are only replaced if necessary.
325
326 The basic flow is: Check if there is a header. If not, call
327 _getNewHeader(srcFile) -> hdr. Store hdr to cache. Insert cached
328 header in the new FITS if it's not there yet.
329
330 You have to implement the _getHeader(srcName) -> pyfits header object
331 function. It must raise an exception if it cannot come up with a
332 header. You also have to implement _isProcessed(srcName) -> boolean
333 returning True if you think srcName already has a processed header.
334
335 This basic flow is influenced by the following opts attributes:
336
337 - reProcess -- even if a cache is present, recompute header values
338 - applyHeaders -- actually replace old headers with new headers
339 - reHeader -- even if _isProcessed returns True, write a new header
340 - compute -- perform computations
341
342 The idea is that you can:
343
344 - generate headers without touching the original files: proc
345 - write all cached headers to files that don't have them
346 proc --apply --nocompute
347 - after a bugfix force all headers to be regenerated:
348 proc --reprocess --apply --reheader
349
350 All this leads to the messy logic. Sorry 'bout this.
351 """
352 headerExt = ".hdr"
353 maxHeaderBlocks = 40
354 cardSequence = None
355
358
383
385 """returns a pyfits header object for the cached result in srcName.
386
387 If there is no cache, None is returned.
388 """
389 src = self._makeCacheName(srcName)
390 if os.path.exists(src):
391 with open(src) as f:
392 hdr = fitstools.readPrimaryHeaderQuick(f, self.maxHeaderBlocks)
393 return hdr
394
396 if self.opts.compute:
397 if self.opts.beVerbose:
398 print("Now computing for", srcName)
399 hdr = self._getHeader(srcName)
400 if hdr is None:
401 raise CannotComputeHeader("_getHeader returned None")
402 self._writeCache(srcName, hdr)
403
404
405 keepKeys = set(["SIMPLE", "BITPIX", "NAXIS", "NAXIS1", "NAXIS2",
406 "EXTEND", "BZERO", "BSCALE"])
407
413
420
422 """returns true if the history item value should be preserved.
423 """
424 return True
425
431
433 """override.
434 """
435 return False
436
438 """override this or _getHeader.
439 """
440 return header
441
443 """override this or _mungeHeader.
444 """
445 return self._mungeHeader(srcName, self.getPrimaryHeader(srcName))
446
447 @staticmethod
449 """returns the primary header of srcName.
450
451 This is a convenience function for user derived classes.
452 """
453 f = open(srcName)
454 hdr = utils.readPrimaryHeaderQuick(f)
455 f.close()
456 return hdr
457
459 if (not (self.opts.reProcess or self.opts.reHeader)
460 and self._isProcessed(srcName)):
461 return
462 cache = self._readCache(srcName)
463 if cache is None or self.opts.reProcess:
464 self._makeCache(srcName)
465 cache = self._readCache(srcName)
466 if cache is None:
467 return
468 if not self.opts.applyHeaders:
469 return
470 if self.opts.reHeader or not self._isProcessed(srcName):
471 self._writeHeader(srcName, cache)
472
473 @staticmethod
475 FileProcessor.addOptions(optParser)
476 optParser.add_option("--reprocess", help="Recompute all headers",
477 action="store_true", dest="reProcess", default=False)
478 optParser.add_option("--no-compute", help="Only use cached headers",
479 action="store_false", dest="compute", default=True)
480 optParser.add_option("--apply", help="Write cached headers to"
481 " source files", action="store_true", dest="applyHeaders",
482 default=False)
483 optParser.add_option("--reheader", help="Write cached headers"
484 " to source files even if it looks like they already have"
485 " been written", action="store_true", dest="reHeader",
486 default=False)
487
490 """A file processor for calibrating FITS frames using astrometry.net.
491
492 It might provide calibration for "simple" cases out of the box. You
493 will usually want to override some solver parameters. To do that,
494 define class attributes sp_<parameter name>, where the parameters
495 available are discussed in helpers.anet's docstring. sp_indices is
496 one thing you will typically need to override.
497
498 To use SExtractor rather than anet's source extractor, override
499 sexControl, to use an object filter (see anet.getWCSFieldsFor), override
500 the objectFilter attribute.
501
502 To add additional fields, override _getHeader and call the parent
503 class' _getHeader method. To change the way astrometry.net is
504 called, override the _solveAnet method (it needs to return some
505 result anet.of getWCSFieldsFor) and call _runAnet with your
506 custom arguments for getWCSFieldsFor.
507
508 See :dachsdoc:`processors#astrometry-net` for details.
509 """
510 sexControl = None
511 objectFilter = None
512
513 noCopyHeaders = set(["simple", "bitpix", "naxis", "imageh", "imagew",
514 "naxis1", "naxis2", "datamin", "datamax", "date"])
515
516 @staticmethod
518 HeaderProcessor.addOptions(optParser)
519 optParser.add_option("--no-anet", help="Do not run anet, fail if"
520 " no cache is present to take anet headers from", action="store_false",
521 dest="runAnet", default=True)
522 optParser.add_option("--copy-to", help="Copy astrometry.net sandbox to"
523 " this directory (WARNING: it will be deleted if it exists!)."
524 " Probably most useful with --bail",
525 action="store", dest="copyTo", default=None)
526
529
534
535 @property
537 return dict(
538 (n[3:], getattr(self, n))
539 for n in dir(self) if n.startswith("sp_"))
540
542 if self.opts.runAnet:
543 return self._runAnet(srcName)
544 else:
545 oldCards = self._readCache(srcName)
546 if oldCards is None:
547 raise CannotComputeHeader("No cached headers and you asked"
548 " not to run astrometry.net")
549 return oldCards.cards
550
553
563
573
575 return ("suite" in value or
576 "blind" in value)
577
580 """A file processor for generating previews.
581
582 For these, define a method getPreviewData(accref) -> string returning
583 the raw preview data.
584 """
585 @staticmethod
587 FileProcessor.addOptions(optParser)
588 optParser.add_option("--force", help="Generate previews even"
589 " where they already exist", action="store_true",
590 dest="force", default=False)
591
593 """iterates over the accrefs in the first table of dd.
594 """
595 tableId = self.dd.makes[0].table.getQName()
596 for r in self.conn.queryToDicts("select accref from %s"%tableId):
597 yield r["accref"]
598
606
608 res = list(self.conn.query("select preview from dc.products where"
609 " accref=%(accref)s", {"accref": accref}))
610 if not res:
611 raise IOError("%s is not in the products table. Update/import"
612 " the resource?"%accref)
613 if res[0][0]=="AUTO":
614 raise base.ReportableError("Preview path in the product table is AUTO."
615 " Will not write preview there. Make sure you have properly"
616 " bound the preview parameter in //products#define and re-import.")
617 return os.path.join(self.inputsDir, res[0][0])
618
620 if not path.startswith("/"):
621
622 path = self.getPreviewPath(path)
623
624 if self.opts.force:
625 return "without"
626 if os.path.exists(path):
627 return "with"
628 else:
629 return "without"
630
644
647 linearFluxes = False
648 spectralColumn = "spectral"
649 fluxColumn = "flux"
650 connectPoints = True
651
655
656 @staticmethod
657 - def get2DPlot(tuples, linear=False, connectPoints=True):
658 """returns a png-compressed pixel image for a 2D plot of (x,y)
659 tuples.
660 """
661 fig = pyplot.figure(figsize=(4,2))
662 ax = fig.add_axes([0,0,1,1], frameon=False)
663
664 if linear:
665 plotter = ax.plot
666 else:
667 plotter = ax.semilogy
668
669 if connectPoints:
670 linestyle = "-"
671 else:
672 linestyle = "o"
673
674 plotter(
675 [r[0] for r in tuples],
676 [r[1] for r in tuples],
677 linestyle,
678 color="black")
679 ax.xaxis.set_major_locator(matplotlib.ticker.NullLocator())
680 ax.yaxis.set_major_locator(matplotlib.ticker.NullLocator())
681 ax.yaxis.set_minor_locator(matplotlib.ticker.NullLocator())
682
683 rendered = StringIO()
684 pyplot.savefig(rendered, format="png", dpi=50)
685 pyplot.close()
686
687 rendered = StringIO(rendered.getvalue())
688 im = Image.open(rendered)
689 im = im.convert("L")
690 im = im.convert("P", palette=Image.ADAPTIVE, colors=8)
691 compressed = StringIO()
692 im.save(compressed, format="png", bits=3)
693 return compressed.getvalue()
694
702
703
704 -def procmain(processorClass, rdId, ddId):
705 """The "standard" main function for processor scripts.
706
707 The function returns the instanciated processor so you can communicate
708 from your processor back to your own main.
709
710 See :dachsdoc:`processors.html` for details.
711 """
712 import optparse
713 from gavo import rscdesc
714 rd = base.caches.getRD(rdId)
715 dd = rd.getById(ddId)
716
717 parser = optparse.OptionParser()
718 processorClass.addOptions(parser)
719 opts, args = parser.parse_args()
720 if args:
721 parser.print_help(file=sys.stderr)
722 sys.exit(1)
723 if opts.beVerbose:
724 from gavo.user import logui
725 logui.LoggingUI(base.ui)
726 base.DEBUG = True
727
728 proc = processorClass(opts, dd)
729 processed, ignored = proc.processAll()
730 print("%s files processed, %s files with errors"%(processed, ignored))
731 return proc
732