Source code for gavo.helpers.anet

"""
Code to obtain WCS headers for FITS files using astrometry.net

Astrometry.net has oodles of configurable parameters.  Some of
them can be passed in via the solverParameters argument to getWCSFieldsFor,
a dictionary with keys including:

indices
  (default: "index-\*.fits", meaning all indices in your default index dir)
  The file names from anet's index directory you want to have used.
  glob patterns are expanded, but no recursion takes place.

  This could be something like::

    ["index-4211.fits", "index-4210.fits", "index-4209.fits"]

  for largeish images or::

    ["index-4203-*.fits", "index-4202-*.fits"]

  for small ones.  You can also give absolute path names for custom
  indices that live, e.g., in your resource directory.

total_timelimit
  (defaults to 600) -- number of seconds after which the anet run
  should cancel itself.

tweak
  (defaults to True) -- try to obtain a polynomic correction to the
  entire image after obtaining a solution?  This can go wrong in
  particular for exposures with many objects, so you might want to
  set it to off for such cases.

fields
  (default to 1) -- FITS extension to work on

endob
  (defaults to not given) -- last object to be processed.  You don't want to
  raise this too high.  The library will only pass on 10 objects at a
  time anyway, but going too high here will waste lots of time on images
  that are probably not going to resolve anyway.

lower_pix
  (defaults to not given) -- smallest permissible pixel size in arcsecs.  If
  at all possible, constrain this for much better results.

upper_pix
  (defaults to not given) -- largest permissible pixel size in arcsecs.
  See lower_pix.

plots
	(defaults to False) -- generate all kinds of pretty plots?

downsample
	(defaults to not given) -- factor to downsample the image before
	trying to solve.  This may be necessary when not using source-extractor,
	and it should be something like 2, 3, or 4.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import glob
import gzip
import os
import shutil
import subprocess

from gavo import base
from gavo import utils
from gavo.utils import fitstools
from gavo.utils import pyfits

__docformat__ = "restructuredtext en"

anetIndexPath = "/usr/local/astrometry/data"


PARAM_DEFAULTS = {
		"total_timelimit": 600,
		"fields": "1",
		"endob": None,
		"lower_pix": None,
		"upper_pix": None,
		"tweak": True,
		"pix_units": "app",
		"plots": False,
		"downsample": None,
	}


[docs]class Error(base.Error): pass
[docs]class NotSolved(Error): pass
[docs]class ObjectNotFound(Error): pass
[docs]class ShellCommandFailed(Error): def __init__(self, msg, retcode): Error.__init__(self, msg) self.msg, self.retcode = msg, retcode def __str__(self): return "External program failure (%s). Program output: %s"%( self.retcode, self.msg)
def _feedFile(origDir, fName): """links fName to "in.fits" in the sandbox. If fName ends with .gz, the function assumes it's a gzipped file and unzips it to in.fits instead. """ srcName = os.path.join(origDir, fName) destName = "img.fits" if fName.endswith(".gz"): with open(destName, "wb") as destF: utils.cat(gzip.open(srcName), destF) else: os.symlink(srcName, destName) def _runShellCommand(cmd, args): with open("lastCommand.log", "a") as logF: logF.write("\n\n================\nNow running: %s\n\n"% " ".join([cmd]+args)) logF.flush() proc = subprocess.Popen([cmd]+args, stdout=logF, stderr=subprocess.STDOUT) proc.communicate() if proc.returncode==-2: raise KeyboardInterrupt("Child was siginted") elif proc.returncode: raise ShellCommandFailed(open("lastCommand.log").read(), proc.returncode) def _copyCurrentTree(copyTo): if copyTo is not None: try: shutil.rmtree(copyTo) except os.error: pass shutil.copytree(".", copyTo) # Minimal configuration for source-extractor for anet use. # Do not override values in here in your sourceExtractorConfig-s. _ANET_SX_STUB = """# source-extractor control file for astrometry.net CATALOG_TYPE FITS_1.0 # The output file name CATALOG_NAME img.axy # The name of the file containing _ANET_SX_PARAM PARAMETERS_NAME anet.param FILTER_NAME anet.filter """ # export column spec for source-extractor; referenced in _ANET_SX_STUB _ANET_SX_PARAM = """X_IMAGE Y_IMAGE MAG_ISO FLUX_AUTO ELONGATION """ # Blatantly stolen from anet... _ANET_SX_FILTER = """CONV NORM # 5x5 convolution mask of a gaussian PSF with FWHM = 2.0 pixels. 0.006319 0.040599 0.075183 0.040599 0.006319 0.040599 0.260856 0.483068 0.260856 0.040599 0.075183 0.483068 0.894573 0.483068 0.075183 0.040599 0.260856 0.483068 0.260856 0.040599 0.006319 0.040599 0.075183 0.040599 0.006319 """ def _createSourceExtractorFiles(sourceExtractorControl): with open("anet.control", "w") as f: f.write(_ANET_SX_STUB+sourceExtractorControl) with open("anet.param", "w") as f: f.write(_ANET_SX_PARAM) with open("anet.filter", "w") as f: f.write(_ANET_SX_FILTER) def _extractSources(filterFunc=None): """does source extraction using source-extractor. If filterFunc is not None, it is called before sorting the extracted objects. It must change the file named in the argument in place. """ _runShellCommand("source-extractor", ["-c", "anet.control", "img.fits"]) if filterFunc is not None: filterFunc("img.axy") _PARAMETER_MAP = [ ("total_timelimit", "--cpulimit"), ("fields", "--fields"), ("lower_pix", "--scale-low"), ("upper_pix", "--scale-high"), ("pix_units", "--scale-units"), ("endob", "--depth"), ("downsample", "--downsample"), ] def _addArgsFor(actPars, args): if not actPars["tweak"]: args.append("--no-tweak") if not actPars["plots"]: args.append("--no-plots") for key, opthead in _PARAMETER_MAP: if actPars[key] is not None: args.extend([opthead, str(actPars[key])]) def _solveField(fName, solverParameters, sourceExtractorControl, objectFilter, indexPath, verbose): """tries to solve an image anet's using solve-field. See _resolve for the meaning of the arguments. """ args = ["--continue"] objectSource = "img.fits" if sourceExtractorControl is not None: _createSourceExtractorFiles(sourceExtractorControl) if sourceExtractorControl==None: pass elif sourceExtractorControl=="": args.extend(["--use-source-extractor"]) else: _extractSources(objectFilter) width, height = fitstools.getPrimarySize("img.fits") args.extend(["--x-column", "X_IMAGE", "--y-column", "Y_IMAGE", "--sort-column", "FLUX_AUTO", "--use-source-extractor", "--width", str(width), "--height", str(height)]) objectSource = "img.axy" args.append("-v") with open("backend.cfg", "w") as backendCfg: if "indices" in solverParameters: backendCfg.write(f"add_path {indexPath}\n") for indF in solverParameters["indices"]: fullPath = os.path.join(indexPath, indF) for fName in glob.glob(fullPath): backendCfg.write("index %s\n"%fName) backendCfg.write("inparallel") else: backendCfg.write(f"add_path {indexPath}\nautoindex\n") args.extend(["--backend-config", "backend.cfg"]) actPars = PARAM_DEFAULTS.copy() actPars.update(solverParameters) _addArgsFor(actPars, args) args.append(objectSource) if verbose: print("Running %s %s"%("solve-field", " ".join(args))) _runShellCommand("solve-field", args) # staggered solving: This seemed like a good idea at some point but # now probably no longer is. # minInd, maxInd = int(actPars["startob"]), int(actPars["endob"]) # curStart = minInd # while True: # if curStart+10>maxInd: # break # args.extend(["--depth", "%s-%s"%(curStart+1, curStart+15), "img.fits"]) # _runShellCommand(solverBin, args) # if os.path.exists("img.solved"): # break # curStart += 5 # args[-3:] = [] def _resolve(fName, solverParameters, sourceExtractorControl, objectFilter, copyTo, indexPath, verbose): """runs the astrometric calibration pipeline. solverParameters is a dictionary; most keys in it are simply turned into command line options of solve-field. This function litters the working directory with all kinds of files and does not clean up, so you'd better run it in a sandbox. It raises a NotSolved exception if no solution could be found; otherwise you should find the solution in img.wcs. """ if "_preparationFunction" in solverParameters: solverParameters["_preparationFunction"]() try: _solveField(fName, solverParameters, sourceExtractorControl, objectFilter, indexPath, verbose) except Exception: _copyCurrentTree(copyTo) raise _copyCurrentTree(copyTo) if os.path.exists("img.solved"): return raise NotSolved(fName)
[docs]def getWCSFieldsFor(fName, solverParameters, sourceExtractorControl, objectFilter, copyTo, indexPath, verbose): """returns a pyfits cardlist for the WCS fields on fName. solverParameters is a dictionary mapping solver keys to their values, sourceExtractorControl is a script for source-extractor, and its presence means that source-extractor should be used for source extraction rather than what anet has built in. objectFilter is a function that is called with the name of the FITS with the extracted sources. It can remove or add sources to that file before astrometry.net tries to match. To see what solverParameters are available, check the module docstring. """ try: with utils.sandbox() as origDir: _feedFile(origDir, fName) _resolve(fName, solverParameters=solverParameters, sourceExtractorControl=sourceExtractorControl, objectFilter=objectFilter, copyTo=copyTo, indexPath=indexPath, verbose=verbose) return pyfits.getheader("img.wcs").cards except NotSolved: return None