Package gavo :: Package helpers :: Module processing
[frames] | no frames]

Source Code for Module gavo.helpers.processing

  1  """ 
  2  An abstract processor and some helping code. 
  3   
  4  Currently, I assume a plain text interface for those.  It might be 
  5  a good idea to use the event mechanism here. 
  6  """ 
  7   
  8  #c Copyright 2008-2019, the GAVO project 
  9  #c 
 10  #c This program is free software, covered by the GNU GPL.  See the 
 11  #c COPYING file in the source distribution. 
 12   
 13   
 14  from __future__ import print_function 
 15   
 16  from cStringIO import StringIO 
 17  import os 
 18  import sys 
 19  import textwrap 
 20  import traceback 
 21   
 22  import matplotlib 
 23  # if the next line fails because the backend is already set, change the import 
 24  # order and move processing up. 
 25  matplotlib.use("Agg") 
 26   
 27  from PIL import Image 
 28   
 29  from gavo import base 
 30  from gavo import rsc 
 31  from gavo import utils 
 32  from gavo.helpers import anet 
 33  from gavo.helpers import fitstricks 
 34  from gavo.utils import fitstools 
 35  from gavo.utils import pyfits 
 36   
 37  pyplot = utils.DeferredImport("pyplot", "from matplotlib import pyplot") 
38 39 40 -class CannotComputeHeader(Exception):
41 """is raised when no FITS header was generated by a HeaderProcessor. 42 43 Specifically, this is what gets raised when ``_getHeader`` returns ``None``. 44 """
45
46 47 -class FileProcessor(object):
48 """An abstract base for a source file processor. 49 50 In concrete classes, you need to define a ``process(fName)`` method 51 receiving a source as returned by the dd (i.e., usually a file name). 52 53 You can override the method ``_createAuxiliaries(dataDesc)`` to compute 54 things like source catalogues, etc. Thus, you should not need to 55 override the constructor. 56 57 These objects are usually constructed thorough ``api.procmain`` as 58 discussed in :dachsdoc:`processing.html`. 59 """ 60 inputsDir = base.getConfig("inputsDir") 61
62 - def __init__(self, opts, dd):
63 self.opts, self.dd = opts, dd 64 self._createAuxiliaries(dd)
65
66 - def _createAuxiliaries(self, dd):
67 # There's been a type here in previous DaCHS versions; try 68 # to call old methods if they are still there 69 if hasattr(self, "_createAuxillaries"): 70 self._createAuxillaries(dd)
71
72 - def classify(self, fName):
73 return "unknown"
74
75 - def process(self, fName):
76 pass
77
78 - def addClassification(self, fName):
79 label = self.classify(fName) 80 self.reportDict.setdefault(label, []).append(os.path.basename(fName))
81
82 - def printTableSize(self):
83 try: 84 tableName = self.dd.makes[0].table.getQName() 85 with base.AdhocQuerier(base.getAdminConn) as q: 86 itemsInDB = list(q.query("SELECT count(*) from %s"%tableName))[0][0] 87 print("Items currently in assumed database table: %d\n"%itemsInDB) 88 except (base.DBError, IndexError): 89 pass
90
91 - def printReport(self, processed, ignored):
92 print("\n\nProcessor Report\n================\n") 93 if ignored: 94 print("Warning: There were %d errors during classification"%ignored) 95 repData = sorted(zip(map(len, self.reportDict.values()), 96 self.reportDict.keys())) 97 print(utils.formatSimpleTable(repData)) 98 print("\n") 99 self.printTableSize()
100
101 - def printVerboseReport(self, processed, ignored):
102 print("\n\nProcessor Report\n================\n") 103 if ignored: 104 print("Warning: There were %d errors during classification"%ignored) 105 repData = sorted(zip(self.reportDict.values(), 106 self.reportDict.keys()), 107 key=lambda v: -len(v[0])) 108 print("\n%s\n%s\n"%(repData[0][1], "-"*len(repData[0][1]))) 109 print("%d items\n"%(len(repData[0][0]))) 110 for items, label in repData[1:]: 111 print("\n%s\n%s\n"%(label, "-"*len(label))) 112 items.sort() 113 print("%d items:\n"%(len(items))) 114 print("\n".join(textwrap.wrap(", ".join(items)))) 115 print("\n") 116 self.printTableSize()
117 118 @staticmethod
119 - def addOptions(parser):
120 parser.add_option("--filter", dest="requireFrag", metavar="STR", 121 help="Only process files with names containing STR", default=None) 122 parser.add_option("--bail", help="Bail out on a processor error," 123 " dumping a traceback", action="store_true", dest="bailOnError", 124 default=False) 125 parser.add_option("--report", help="Output a report only", 126 action="store_true", dest="doReport", default=False) 127 parser.add_option("--verbose", help="Be more talkative", 128 action="store_true", dest="beVerbose", default=False) 129 parser.add_option("--n-procs", "-j", help="Run NUM processes in" 130 " parallel", action="store", dest="nParallel", default=1, 131 metavar="NUM", type=int)
132 133 _doneSentinel = ("MAGIC: QUEUE DONE",) 134
135 - def iterJobs(self, nParallel):
136 """executes process() in parallel for all sources and iterates 137 over the results. 138 139 We use this rather than multiprocessing's Pool, as that cannot 140 call methods. I'm working around this here. 141 """ 142 import multiprocessing 143 144 taskQueue = multiprocessing.Queue(nParallel*4) 145 doneQueue = multiprocessing.Queue() 146 147 def worker(inQueue, outQueue): 148 if hasattr(self, "conn"): 149 self.conn = base.getDBConnection("trustedquery") 150 for srcId in iter(inQueue.get, None): 151 if (self.opts.requireFrag is not None 152 and not self.opts.requireFrag in srcId): 153 continue 154 155 try: 156 outQueue.put(self.process(srcId)) 157 except base.SkipThis: 158 continue 159 except Exception as ex: 160 ex.source = srcId 161 if self.opts.bailOnError: 162 sys.stderr.write("*** %s\n"%srcId) 163 traceback.print_exc() 164 outQueue.put(ex) 165 outQueue.put(self._doneSentinel)
166 167 # create nParallel workers 168 activeWorkers = 0 169 # close my connection; it'll be nothing but trouble once 170 # the workers see (and close) it, too 171 if hasattr(self, "conn"): 172 self.conn.close() 173 for i in range(nParallel): 174 multiprocessing.Process(target=worker, 175 args=(taskQueue, doneQueue)).start() 176 activeWorkers += 1 177 if hasattr(self, "conn"): 178 self.conn = base.getDBConnection("trustedquery") 179 180 # feed them their tasks 181 toDo = self.iterIdentifiers() 182 while True: 183 try: 184 taskQueue.put(toDo.next()) 185 except StopIteration: 186 break 187 188 while not doneQueue.empty(): 189 yield doneQueue.get() 190 191 # ask them to quit and wait until all have said they're quitting 192 for i in range(nParallel): 193 taskQueue.put(None) 194 taskQueue.close() 195 while activeWorkers: 196 item = doneQueue.get() 197 if item==self._doneSentinel: 198 activeWorkers -= 1 199 else: 200 yield item
201
202 - def _runProcessor(self, procFunc, nParallel=1):
203 """calls procFunc for all sources in self.dd. 204 205 This is the default, single-tasking implementation. 206 """ 207 processed, ignored = 0, 0 208 209 if nParallel==1: 210 def iterProcResults(): 211 for source in self.iterIdentifiers(): 212 if (self.opts.requireFrag is not None 213 and not self.opts.requireFrag in source): 214 continue 215 216 try: 217 yield procFunc(source) 218 except base.SkipThis: 219 continue 220 except Exception as ex: 221 ex.source = source 222 if self.opts.bailOnError: 223 sys.stderr.write("*** %s\n"%source) 224 traceback.print_exc() 225 yield ex
226 resIter = iterProcResults() 227 else: 228 resIter = self.iterJobs(nParallel) 229 230 while True: 231 try: 232 res = resIter.next() 233 if isinstance(res, Exception): 234 raise res 235 except StopIteration: 236 break 237 except KeyboardInterrupt: 238 sys.exit(2) 239 except Exception as msg: 240 if self.opts.bailOnError: 241 sys.exit(1) 242 sys.stderr.write("Skipping source %s: (%s, %s)\n"%( 243 getattr(msg, "source", "(unknown)"), msg.__class__.__name__, 244 repr(msg))) 245 ignored += 1 246 processed += 1 247 sys.stdout.write("%6d (-%5d)\r"%(processed, ignored)) 248 sys.stdout.flush() 249 return processed, ignored 250
251 - def iterIdentifiers(self):
252 """iterates over all identifiers that should be processed. 253 254 This is usually the paths of the files to be processed. 255 You can, however, override it to do something else if that 256 fits your problem (example: Previews in SSA use the accref). 257 """ 258 return iter(self.dd.sources)
259
260 - def processAll(self):
261 """calls the process method of processor for all sources of the data 262 descriptor dd. 263 """ 264 if self.opts.doReport: 265 self.reportDict = {} 266 procFunc = self.addClassification 267 else: 268 procFunc = self.process 269 processed, ignored = self._runProcessor(procFunc, 270 nParallel=self.opts.nParallel) 271 if self.opts.doReport: 272 if self.opts.beVerbose: 273 self.printVerboseReport(processed, ignored) 274 else: 275 self.printReport(processed, ignored) 276 return processed, ignored
277 278 ######### Utility methods 279
280 - def getProductKey(self, srcName):
281 return utils.getRelativePath(srcName, self.inputsDir)
282
283 284 -class ImmediateHeaderProcessor(FileProcessor):
285 """An base for processors doing simple FITS manipulations to the primary 286 FITS header. 287 288 To define these, override ``_isProcessed(self, srcName, hdr)`` and 289 ``_changeHeader(self, hdr)``. 290 291 ``_changeHeader`` can change the pyfits header ``hdr`` in place. It 292 will then be replaced on the actual file. 293 294 For complex operations, it is probably advisable to use ``HeaderProcessor`` 295 which gives you a two-step process of first having the detached headers 296 that you can check before applying them. 297 """
298 - def _isProcessed(self, srcName, hdr):
299 return False
300
301 - def process(self, srcName):
302 with open(srcName) as f: 303 hdr = fitstools.readPrimaryHeaderQuick(f) 304 if not self.opts.reProcess: 305 if self._isProcessed(srcName, hdr): 306 return 307 308 self._changeHeader(srcName, hdr) 309 fitstools.replacePrimaryHeaderInPlace(srcName, hdr)
310 311 @staticmethod
312 - def addOptions(optParser):
313 FileProcessor.addOptions(optParser) 314 optParser.add_option("--reprocess", help="Recompute all headers", 315 action="store_true", dest="reProcess", default=False)
316
317 318 -class HeaderProcessor(FileProcessor):
319 """A base for processors doing FITS header manipulations. 320 321 The processor builds naked FITS headers alongside the actual files, with an 322 added extension .hdr (or whatever is in the headerExt attribute). The 323 presence of a FITS header indicates that a file has been processed. The 324 headers on the actual FITS files are only replaced if necessary. 325 326 The basic flow is: Check if there is a header. If not, call 327 _getNewHeader(srcFile) -> hdr. Store hdr to cache. Insert cached 328 header in the new FITS if it's not there yet. 329 330 You have to implement the _getHeader(srcName) -> pyfits header object 331 function. It must raise an exception if it cannot come up with a 332 header. You also have to implement _isProcessed(srcName) -> boolean 333 returning True if you think srcName already has a processed header. 334 335 This basic flow is influenced by the following opts attributes: 336 337 - reProcess -- even if a cache is present, recompute header values 338 - applyHeaders -- actually replace old headers with new headers 339 - reHeader -- even if _isProcessed returns True, write a new header 340 - compute -- perform computations 341 342 The idea is that you can: 343 344 - generate headers without touching the original files: proc 345 - write all cached headers to files that don't have them 346 proc --apply --nocompute 347 - after a bugfix force all headers to be regenerated: 348 proc --reprocess --apply --reheader 349 350 All this leads to the messy logic. Sorry 'bout this. 351 """ 352 headerExt = ".hdr" 353 maxHeaderBlocks = 40 354 cardSequence = None 355
356 - def _makeCacheName(self, srcName):
357 return srcName+self.headerExt
358
359 - def _writeCache(self, srcName, hdr):
360 dest = self._makeCacheName(srcName) 361 362 # nuke info on sizes that may still lurk; we don't want that in the 363 # cache 364 hdr = hdr.copy() 365 hdr.set("BITPIX", 8) 366 hdr.set("NAXIS", 0) 367 if "NAXIS1" in hdr: 368 del hdr["NAXIS1"] 369 if "NAXIS2" in hdr: 370 del hdr["NAXIS2"] 371 372 hdr = fitstools.sortHeaders(hdr, 373 commentFilter=self.commentFilter, 374 historyFilter=self.historyFilter, 375 cardSequence=self.cardSequence) 376 377 serialized = fitstools.serializeHeader(hdr) 378 with open(dest, "w") as f: 379 f.write(serialized) 380 hdus = pyfits.open(dest) 381 hdus.verify() 382 hdus.close()
383
384 - def _readCache(self, srcName):
385 """returns a pyfits header object for the cached result in srcName. 386 387 If there is no cache, None is returned. 388 """ 389 src = self._makeCacheName(srcName) 390 if os.path.exists(src): 391 with open(src) as f: 392 hdr = fitstools.readPrimaryHeaderQuick(f, self.maxHeaderBlocks) 393 return hdr
394
395 - def _makeCache(self, srcName):
396 if self.opts.compute: 397 if self.opts.beVerbose: 398 print("Now computing for", srcName) 399 hdr = self._getHeader(srcName) 400 if hdr is None: 401 raise CannotComputeHeader("_getHeader returned None") 402 self._writeCache(srcName, hdr)
403 404 # headers copied from original file rather than the cached header 405 keepKeys = set(["SIMPLE", "BITPIX", "NAXIS", "NAXIS1", "NAXIS2", 406 "EXTEND", "BZERO", "BSCALE"]) 407
408 - def _fixHeaderDataKeys(self, srcName, header):
409 oldHeader = self.getPrimaryHeader(srcName) 410 for key in self.keepKeys: 411 if key in oldHeader: 412 header.set(key, oldHeader[key])
413
414 - def commentFilter(self, value):
415 """returns true if the comment value should be preserved. 416 417 You may want to override this. 418 """ 419 return True
420
421 - def historyFilter(self, value):
422 """returns true if the history item value should be preserved. 423 """ 424 return True
425
426 - def _writeHeader(self, srcName, header):
431
432 - def _isProcessed(self, srcName):
433 """override. 434 """ 435 return False
436
437 - def _mungeHeader(self, srcName, header):
438 """override this or _getHeader. 439 """ 440 return header
441
442 - def _getHeader(self, srcName):
443 """override this or _mungeHeader. 444 """ 445 return self._mungeHeader(srcName, self.getPrimaryHeader(srcName))
446 447 @staticmethod
448 - def getPrimaryHeader(srcName):
449 """returns the primary header of srcName. 450 451 This is a convenience function for user derived classes. 452 """ 453 f = open(srcName) 454 hdr = utils.readPrimaryHeaderQuick(f) 455 f.close() 456 return hdr
457
458 - def process(self, srcName):
459 if (not (self.opts.reProcess or self.opts.reHeader) 460 and self._isProcessed(srcName)): 461 return 462 cache = self._readCache(srcName) 463 if cache is None or self.opts.reProcess: 464 self._makeCache(srcName) 465 cache = self._readCache(srcName) 466 if cache is None: 467 return 468 if not self.opts.applyHeaders: 469 return 470 if self.opts.reHeader or not self._isProcessed(srcName): 471 self._writeHeader(srcName, cache)
472 473 @staticmethod
474 - def addOptions(optParser):
475 FileProcessor.addOptions(optParser) 476 optParser.add_option("--reprocess", help="Recompute all headers", 477 action="store_true", dest="reProcess", default=False) 478 optParser.add_option("--no-compute", help="Only use cached headers", 479 action="store_false", dest="compute", default=True) 480 optParser.add_option("--apply", help="Write cached headers to" 481 " source files", action="store_true", dest="applyHeaders", 482 default=False) 483 optParser.add_option("--reheader", help="Write cached headers" 484 " to source files even if it looks like they already have" 485 " been written", action="store_true", dest="reHeader", 486 default=False)
487
488 489 -class AnetHeaderProcessor(HeaderProcessor):
490 """A file processor for calibrating FITS frames using astrometry.net. 491 492 It might provide calibration for "simple" cases out of the box. You 493 will usually want to override some solver parameters. To do that, 494 define class attributes sp_<parameter name>, where the parameters 495 available are discussed in helpers.anet's docstring. sp_indices is 496 one thing you will typically need to override. 497 498 To use SExtractor rather than anet's source extractor, override 499 sexControl, to use an object filter (see anet.getWCSFieldsFor), override 500 the objectFilter attribute. 501 502 To add additional fields, override _getHeader and call the parent 503 class' _getHeader method. To change the way astrometry.net is 504 called, override the _solveAnet method (it needs to return some 505 result anet.of getWCSFieldsFor) and call _runAnet with your 506 custom arguments for getWCSFieldsFor. 507 508 See :dachsdoc:`processors#astrometry-net` for details. 509 """ 510 sexControl = None 511 objectFilter = None 512 513 noCopyHeaders = set(["simple", "bitpix", "naxis", "imageh", "imagew", 514 "naxis1", "naxis2", "datamin", "datamax", "date"]) 515 516 @staticmethod
517 - def addOptions(optParser):
518 HeaderProcessor.addOptions(optParser) 519 optParser.add_option("--no-anet", help="Do not run anet, fail if" 520 " no cache is present to take anet headers from", action="store_false", 521 dest="runAnet", default=True) 522 optParser.add_option("--copy-to", help="Copy astrometry.net sandbox to" 523 " this directory (WARNING: it will be deleted if it exists!)." 524 " Probably most useful with --bail", 525 action="store", dest="copyTo", default=None)
526
527 - def _isProcessed(self, srcName):
528 return "CD1_1" in self.getPrimaryHeader(srcName)
529
530 - def _runAnet(self, srcName):
531 return anet.getWCSFieldsFor(srcName, self.solverParameters, 532 self.sexControl, self.objectFilter, self.opts.copyTo, 533 self.opts.beVerbose)
534 535 @property
536 - def solverParameters(self):
537 return dict( 538 (n[3:], getattr(self, n)) 539 for n in dir(self) if n.startswith("sp_"))
540
541 - def _solveAnet(self, srcName):
542 if self.opts.runAnet: 543 return self._runAnet(srcName) 544 else: 545 oldCards = self._readCache(srcName) 546 if oldCards is None: 547 raise CannotComputeHeader("No cached headers and you asked" 548 " not to run astrometry.net") 549 return oldCards.cards
550
551 - def _shouldRunAnet(self, srcName, hdr):
552 return True
553
554 - def _getHeader(self, srcName):
555 hdr = self.getPrimaryHeader(srcName) 556 if self._shouldRunAnet(srcName, hdr): 557 wcsCards = self._solveAnet(srcName) 558 if not wcsCards: 559 raise CannotComputeHeader("astrometry.net did not" 560 " find a solution") 561 fitstricks.copyFields(hdr, wcsCards, self.noCopyHeaders) 562 return self._mungeHeader(srcName, hdr)
563
564 - def commentFilter(self, value):
565 return ( "Index name" in value or 566 "Cxdx margin" in value or 567 "Field scale lower" in value or 568 "Field scale upper" in value or 569 "Start obj" in value or 570 "End obj" in value or 571 "Tweak" in value or 572 "code error" in value)
573
574 - def historyFilter(self, value):
575 return ("suite" in value or 576 "blind" in value)
577
578 579 -class PreviewMaker(FileProcessor):
580 """A file processor for generating previews. 581 582 For these, define a method getPreviewData(accref) -> string returning 583 the raw preview data. 584 """ 585 @staticmethod
586 - def addOptions(optParser):
587 FileProcessor.addOptions(optParser) 588 optParser.add_option("--force", help="Generate previews even" 589 " where they already exist", action="store_true", 590 dest="force", default=False)
591
592 - def iterIdentifiers(self):
593 """iterates over the accrefs in the first table of dd. 594 """ 595 tableId = self.dd.makes[0].table.getQName() 596 for r in self.conn.queryToDicts("select accref from %s"%tableId): 597 yield r["accref"]
598
599 - def _createAuxiliaries(self, dd):
600 self.previewDir = dd.rd.getAbsPath( 601 dd.getProperty("previewDir")) 602 if not os.path.isdir(self.previewDir): 603 os.makedirs(self.previewDir) 604 self.conn = base.getDBConnection("trustedquery") 605 FileProcessor._createAuxiliaries(self, dd)
606
607 - def getPreviewPath(self, accref):
608 res = list(self.conn.query("select preview from dc.products where" 609 " accref=%(accref)s", {"accref": accref})) 610 if not res: 611 raise IOError("%s is not in the products table. Update/import" 612 " the resource?"%accref) 613 if res[0][0]=="AUTO": 614 raise base.ReportableError("Preview path in the product table is AUTO." 615 " Will not write preview there. Make sure you have properly" 616 " bound the preview parameter in //products#define and re-import.") 617 return os.path.join(self.inputsDir, res[0][0])
618
619 - def classify(self, path):
620 if not path.startswith("/"): 621 # It's probably an accref when we're not called from process 622 path = self.getPreviewPath(path) 623 624 if self.opts.force: 625 return "without" 626 if os.path.exists(path): 627 return "with" 628 else: 629 return "without"
630
631 - def process(self, accref):
632 path = self.getPreviewPath(accref) 633 if self.classify(path)=="with": 634 return 635 636 dirPart = os.path.dirname(path) 637 if not os.path.exists(dirPart): 638 os.makedirs(dirPart) 639 os.chmod(dirPart, 0775) 640 641 with utils.safeReplaced(path) as f: 642 f.write(self.getPreviewData(accref)) 643 os.chmod(path, 0664)
644
645 646 -class SpectralPreviewMaker(PreviewMaker):
647 linearFluxes = False 648 spectralColumn = "spectral" 649 fluxColumn = "flux" 650 connectPoints = True 651
652 - def _createAuxiliaries(self, dd):
653 PreviewMaker._createAuxiliaries(self, dd) 654 self.sdmDD = self.dd.rd.getById(self.sdmId)
655 656 @staticmethod
657 - def get2DPlot(tuples, linear=False, connectPoints=True):
658 """returns a png-compressed pixel image for a 2D plot of (x,y) 659 tuples. 660 """ 661 fig = pyplot.figure(figsize=(4,2)) 662 ax = fig.add_axes([0,0,1,1], frameon=False) 663 664 if linear: 665 plotter = ax.plot 666 else: 667 plotter = ax.semilogy 668 669 if connectPoints: 670 linestyle = "-" 671 else: 672 linestyle = "o" 673 674 plotter( 675 [r[0] for r in tuples], 676 [r[1] for r in tuples], 677 linestyle, 678 color="black") 679 ax.xaxis.set_major_locator(matplotlib.ticker.NullLocator()) 680 ax.yaxis.set_major_locator(matplotlib.ticker.NullLocator()) 681 ax.yaxis.set_minor_locator(matplotlib.ticker.NullLocator()) 682 683 rendered = StringIO() 684 pyplot.savefig(rendered, format="png", dpi=50) 685 pyplot.close() 686 687 rendered = StringIO(rendered.getvalue()) 688 im = Image.open(rendered) 689 im = im.convert("L") 690 im = im.convert("P", palette=Image.ADAPTIVE, colors=8) 691 compressed = StringIO() 692 im.save(compressed, format="png", bits=3) 693 return compressed.getvalue()
694
695 - def getPreviewData(self, accref):
696 table = rsc.makeData(self.sdmDD, forceSource={ 697 "accref": accref}).getPrimaryTable() 698 data = [(r[self.spectralColumn], r[self.fluxColumn]) for r in table.rows] 699 data.sort() 700 701 return self.get2DPlot(data, self.linearFluxes, self.connectPoints)
702
703 704 -def procmain(processorClass, rdId, ddId):
705 """The "standard" main function for processor scripts. 706 707 The function returns the instanciated processor so you can communicate 708 from your processor back to your own main. 709 710 See :dachsdoc:`processors.html` for details. 711 """ 712 import optparse 713 from gavo import rscdesc #noflake: for registration 714 rd = base.caches.getRD(rdId) 715 dd = rd.getById(ddId) 716 717 parser = optparse.OptionParser() 718 processorClass.addOptions(parser) 719 opts, args = parser.parse_args() 720 if args: 721 parser.print_help(file=sys.stderr) 722 sys.exit(1) 723 if opts.beVerbose: 724 from gavo.user import logui 725 logui.LoggingUI(base.ui) 726 base.DEBUG = True 727 728 proc = processorClass(opts, dd) 729 processed, ignored = proc.processAll() 730 print("%s files processed, %s files with errors"%(processed, ignored)) 731 return proc
732