Source code for gavo.utils.fitstools

"""
Some utility functions to deal with FITS files.

It's unclear if pyfits coming from astropy is now threadsafe, but I'm hoping
it might be, so for now I'm making fitslock a no-op.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

import builtins
import datetime
import itertools
import re
import struct
import threading
import warnings
from contextlib import contextmanager

import numpy
# temporary measure to shut up astropy's configuration parser.
builtins._ASTROPY_SETUP_ = True  # type:ignore
# the following is the original import of pyfits, and only it should be used
from astropy.io import fits as pyfits # type:ignore
from astropy.utils.exceptions import AstropyDeprecationWarning # type:ignore
warnings.filterwarnings('ignore', category=AstropyDeprecationWarning)

from gavo.utils import codetricks
from gavo.utils import excs
from gavo.utils import ostricks

from gavo.utils.dachstypes import (Any, BinaryIO, Callable, Dict, Filename,
	Generator, List, NDArray, Optional, StrOrBytes, StrToStrMap, Tuple)



_FITS_TABLE_LOCK = threading.RLock()

[docs]@contextmanager def fitsLock(): _FITS_TABLE_LOCK.acquire() try: yield finally: _FITS_TABLE_LOCK.release()
CARD_SIZE = 80 END_CARD = b'END'+b' '*(CARD_SIZE-3) FITS_BLOCK_SIZE = CARD_SIZE*36
[docs]class FITSError(Exception): pass
# pyfits is a bit too liberal in throwing depreciation warnings. Filter them # for now TODO: Figure out a system to check them now and then warnings.filterwarnings('ignore', category=DeprecationWarning) try: warnings.filterwarnings('ignore', category=pyfits.PyfitsDeprecationWarning) except AttributeError: # pyfits too old to produce these: Good. pass
[docs]def padCard(input: StrOrBytes, length: int=CARD_SIZE) -> bytes: """pads input (a string or bytes) with blanks until len(result)%80=0 The length keyword argument lets you adjust the "card size". Use this to pad headers with length=FITS_BLOCK_SIZE This always returns bytes. >>> padCard("") b'' >>> len(padCard(b"end")) 80 >>> len(padCard(b"whacko"*20)) 160 >>> len(padCard("junkodumnk"*17, 27))%27 0 """ # This is like pyfits._pad, but I'd rather not depend on pyfits internals # to much. if isinstance(input, str): input = input.encode("ascii") l = len(input) if not l%length: return input return input+b' '*(length-l%length)
[docs]def readHeaderBytes(f: BinaryIO, maxHeaderBlocks: int=80) -> bytes: """returns the bytes belonging to a FITS header starting at the current position within the file f. If the header is not complete after reading maxHeaderBlocks blocks, a FITSError is raised. """ if hasattr(f, "encoding"): raise IOError("fitstools only works with files opened in binary mode.") parts = [] while True: block = f.read(FITS_BLOCK_SIZE) if not block: raise EOFError('Premature end of file while reading header') parts.append(block) endCardPos = block.find(END_CARD) if not endCardPos%CARD_SIZE: break if len(parts)>=maxHeaderBlocks: raise FITSError("No end card found within %d blocks"%maxHeaderBlocks) return b"".join(parts)
[docs]def readPrimaryHeaderQuick(f: BinaryIO, maxHeaderBlocks: int=80 ) -> pyfits.Header: """returns a pyfits header for the primary hdu of the opened file f. f must be opened in binary mode. This is mostly code lifted from pyfits._File._readHDU. The way that class is made, it's hard to use it with stuff from a gzipped source, and that's why this function is here. It is used in the quick mode of fits grammars. This function is adapted from pyfits. """ return pyfits.Header.fromstring( readHeaderBytes(f, maxHeaderBlocks).decode("ascii", "ignore"))
[docs]def parseCards(aString: StrOrBytes) -> pyfits.Header: """returns a list of pyfits Cards parsed from aString. This will raise a ValueError if aString's length is not divisible by 80. It may also raise pyfits errors for malformed cards. Empty (i.e., all-whitespace) cards are ignored. If an END card is encountered processing is aborted. """ if isinstance(aString, str): aString = aString.encode("ascii") cards = [] if len(aString)%CARD_SIZE: raise ValueError("parseCards argument has impossible length %s"%( len(aString))) for offset in range(0, len(aString), CARD_SIZE): rawCard = aString[offset:offset+CARD_SIZE] if rawCard==END_CARD: break if not rawCard.strip(): continue cards.append(pyfits.Card().fromstring(rawCard.decode("ascii"))) return cards
[docs]def serializeHeader(hdr: pyfits.Header) -> bytes: """returns the FITS serialization of a FITS header hdr. This is always bytes. """ parts = [] for card in hdr.cards: r = card.image.encode("ascii") assert not len(r)%CARD_SIZE parts.append(r) serForm = b"".join(parts)+padCard('END') return padCard(serForm, length=FITS_BLOCK_SIZE)
[docs]def replacePrimaryHeader( inputFile: BinaryIO, newHeader: pyfits.Header, targetFile: BinaryIO, bufSize: int=100000) -> None: """writes a FITS to targetFile having newHeader as the primary header, where the rest of the data is taken from inputFile. inputFile must be a file freshly opened for binary reading, targetFile one freshly opened for binary writing. This function is (among other things) a workaround for pyfits' misfeature of unscaling scaled data in images when extending a header. """ readPrimaryHeaderQuick(inputFile) targetFile.write(serializeHeader(newHeader)) while True: buf = inputFile.read(bufSize) if not buf: break targetFile.write(buf)
[docs]def replacePrimaryHeaderInPlace( fitsName: Filename, newHeader: pyfits.Header) -> None: """replaces the primary header of fitsName with newHeader. Doing this, it tries to minimize the amount of writing necessary; if fitsName has enough space for newHeader, just the header is written, and newHeader is extended if necessary. Only if newHeader is longer than the existing header is fitsName actually copied. We try to be safe in this case, only overwriting the old entry when the new data is safely on disk. gzipped inputs used to be supported here, but they aren't any more. """ if fitsName.endswith(".gz"): raise NotImplementedError("replacePrimaryHeaderInPlace no longer" " supports gzipped files.") serializedNew = serializeHeader(newHeader) with open(fitsName, "rb") as inputFile: serializedOld = readHeaderBytes(inputFile) inputFile.seek(0) if len(serializedNew)<len(serializedOld): # the new header is shorter than the old one; pad it with empty # cards, then make sure the end card is in the last block serializedNew = serializedNew+( len(serializedOld)-len(serializedNew))*b" " serializedNew = serializedNew.replace(END_CARD, b" "*len(END_CARD)) serializedNew = serializedNew[:-len(END_CARD)]+END_CARD assert len(serializedNew)==len(serializedOld) if len(serializedNew)==len(serializedOld): # header lengths match (after possible padding); just write # the new header and be done with open(fitsName, "rb+") as targetFile: targetFile.seek(0) targetFile.write(serializedNew) else: # New header is longer than the old one, write the whole mess. with ostricks.safeReplaced(fitsName) as targetFile: replacePrimaryHeader(inputFile, newHeader, targetFile)
# enforced sequence of well-known keywords, and whether they are mandatory STANDARD_CARD_SEQUENCE = [ ("SIMPLE", True), ("BITPIX", True), ("NAXIS", True), ("NAXIS1", False), ("NAXIS2", False), ("NAXIS3", False), ("NAXIS4", False), ("EXTEND", False), ("BZERO", False), ("BSCALE", False), ] def _iterForcedPairs(seq: List) -> Generator: """helps _enforceHeaderConstraints. """ if seq is None: return for item in seq: if isinstance(item, tuple): yield item else: yield (item, False) def _enforceHeaderConstraints( cardList: List[pyfits.Card], cardSequence: Optional[List[Tuple[str, bool]]]): """returns a pyfits header containing the cards in cardList with FITS sequence constraints satisfied. This may raise a FITSError if mandatory cards are missing. This will only work correctly for image FITSes with less than five dimensions. On cardSequence, see sortHeaders (except that this function always requires sequences of pairs). """ # I can't use pyfits.verify for this since cardList may not refer to # a data set that's actually in memory cardsAdded = set() newCards: List[pyfits.Card] = [] cardDict = dict((card.keyword, card) for card in cardList) for kw, mandatory in itertools.chain( STANDARD_CARD_SEQUENCE, _iterForcedPairs(cardSequence or [])): if isinstance(kw, pyfits.Card): newCards.append(kw) continue if kw in cardsAdded: continue try: newCards.append(cardDict[kw]) cardsAdded.add(kw) except KeyError: if mandatory: raise FITSError("Mandatory card '%s' missing"%kw) for card in cardList: # use cardList rather than cardDict to maintain # cardList order if card.keyword not in cardsAdded or card.keyword=='': newCards.append(card) return pyfits.Header(newCards)
[docs]def sortHeaders(header: pyfits.Header, commentFilter: Optional[Callable[[str], bool]]=None, historyFilter: Optional[Callable[[str], bool]]=None, cardSequence: Optional[List[Tuple[str, bool]]]=None) -> pyfits.Header: """returns a pyfits header with "real" cards first, then history, then comment cards. Blanks in the input are discarded, and one blank each is added in between the sections of real cards, history and comments. Header can be an iterable yielding Cards or a pyfits header. Duplicate history or comment entries will be swallowed. cardSequence, if present, is a sequence of (item, mandatory) pairs. There item can be a card name, in which case the corresponding card will be inserted at this point in the sequence. If mandatory is True, a missing card is an error. Keywords already in fitstools.STANDARD_CARD_SEQUENCE are ignored. Item can also a pyfits.Card instance; it will be put into the header as-is. As a shortcut, a sequence item may be something else then a tuple; it will then be combined with a False to make one. These days, if you think you need this, have a look at fitstricks.makeHeaderFromTemplate first. """ commentCs, historyCs, realCs = [], [], [] for card in getattr(header, "cards", header): if card.keyword=="COMMENT": commentCs.append(card) elif card.keyword=="HISTORY": historyCs.append(card) else: realCs.append(card) newCards = [] for card in realCs: newCards.append(card) historySeen = set() if historyCs: newCards.append(pyfits.Card(keyword="")) for card in historyCs: if historyFilter is None or historyFilter(card.value): if card.value not in historySeen: newCards.append(card) historySeen.add(card.value) commentsSeen = set() if commentCs: newCards.append(pyfits.Card(keyword="")) for card in commentCs: if commentFilter is None or commentFilter(card.value): if card.value not in commentsSeen: commentsSeen.add(card.value) newCards.append(card) return _enforceHeaderConstraints(newCards, cardSequence)
[docs]def openFits(fitsName: Filename) -> pyfits.HDUList: """returns the hdus for fName. This is simulating legacy pyfits behaviour in that it will use memmap I/O if it thinks that works. You'll probably want to use astropy directly these days if this gives you a headache. """ hdus = pyfits.open(fitsName, memmap=True) # memmapping will bomb later if data needs to be scaled, so we # guess now if that's going to happen if (hdus[0].header.get("BZERO", 0)!=0 or hdus[0].header.get("BSCALE", 1)!=1 or "BLANK" in hdus[0].header): hdus.close() hdus = pyfits.open(fitsName, memmap=False) return hdus
[docs]def getAxisLengths(hdr: pyfits.Header) -> List[int]: """returns a sequence of the lengths of the axes of a FITS image described by hdr. """ return [hdr["NAXIS%d"%i] for i in range(1, hdr["NAXIS"]+1)]
_FitsCuts = Tuple[Optional[float], ...]
[docs]def cutoutHeader(header: pyfits.Header, *cuts: _FitsCuts ) -> Tuple[List, pyfits.Header]: """returns a array slices and a new pyfits header reflecting cuts. (see cutoutFits for what cuts is). """ cutDict = dict((c[0], c[1:]) for c in cuts) slices = [] newHeader = header.copy() for index, length in enumerate(getAxisLengths(header)): firstPix, lastPix = cutDict.get(index+1, (None, None)) if firstPix is None: firstPix = 1 if lastPix is None: lastPix = length firstPix = min(max(1, firstPix), length) lastPix = min(length, max(1, lastPix)) if (firstPix, lastPix)==(1, length): slices.append(slice(None, None, None)) else: firstPix -= 1 newAxisLength = lastPix-firstPix if newAxisLength==0: newAxisLength = 1 lastPix = firstPix+1 slices.append(slice(int(firstPix), int(lastPix), 1)) newHeader["NAXIS%d"%(index+1)] = newAxisLength refpixKey = "CRPIX%d"%(index+1) newHeader.set(refpixKey, newHeader[refpixKey]-firstPix) return slices, newHeader
[docs]def cutoutFITS(hdu: pyfits.ImageHDU, *cuts: _FitsCuts) -> pyfits.ImageHDU: """returns a cutout of hdu restricted to cuts. hdu is a primary FITS hdu. cuts is a list of cut specs, each of which is a triple (axis, lower, upper). axis is between 1 and naxis, lower and upper a 1-based pixel coordinates of the limits, and "border" pixels are included. Specifications outside of the image are legal and will be cropped back. Open limits are supported via a specification of None. If an axis would vanish (i.e. length 0 or less), the function fudges things such that the axis gets a length of 1. axis is counted here in the FORTRAN/FITS sense, *not* in the C sense, i.e., axis=1 cuts along NAXIS1, which is the *last* index in a numpy array. WCS CRPIXes in hdu's header will be updated. Axes and specified will not be touched. It is an error to specify cuts for an axis twice (behaviour is undefined). Note that this will lose all extensions the original FITS file might have had. """ slices, newHeader = cutoutHeader(hdu.header, *cuts) slices.reverse() newHDU = hdu.__class__(data=hdu.data[tuple(slices)].copy(order='C'), header=newHeader) # Fix astropy 1.3 foulup if "BZERO" in newHeader: newHDU.header["BZERO"] = newHeader["BZERO"] if "BSCALE" in newHeader: newHDU.header["BSCALE"] = newHeader["BSCALE"] return newHDU
[docs]def shrinkWCSHeader(oldHeader: pyfits.Header, factor: float) -> pyfits.Header: """returns a FITS header suitable for a shrunken version of the image described by oldHeader. This only works for 2d images, scale must be an integer>1. The function assumes no "fractional" pixels are handled, i.e, remainders in divisions with factors are discarded. It is thus a companion for iterScaledRows. Note that oldHeader must be an actual pyfits header instance; a dictionary will not do. This is a pretty straight port of wcstools's imutil.c#ShrinkFITSHeader, except we clear BZERO and BSCALE and set BITPIX to -32 (float array) under the assumption that in the returned image, 32-bit floats are used. """ assert oldHeader["NAXIS"]==2 factor = int(factor) newHeader = oldHeader.copy() newHeader.set("SIMPLE", True,"GAVO DaCHS, %s"%datetime.datetime.utcnow()) newHeader["NAXIS1"] = oldHeader["NAXIS1"]//factor newHeader["NAXIS2"] = oldHeader["NAXIS2"]//factor newHeader["BITPIX"] = -32 try: ffac = float(factor) newHeader["CRPIX1"] = oldHeader["CRPIX1"]/ffac+0.5 newHeader["CRPIX2"] = oldHeader["CRPIX2"]/ffac+0.5 for key in ("CDELT1", "CDELT2", "CD1_1", "CD2_1", "CD1_2", "CD2_2"): if key in oldHeader: newHeader[key] = oldHeader[key]*ffac except KeyError: # no WCS, we're fine either way pass newHeader.set("IMSHRINK", "Image scaled down %s-fold by DaCHS"%factor) for hField in ["BZERO", "BSCALE"]: if hField in newHeader: del newHeader[hField] return newHeader
NUM_CODE = { 8: 'uint8', 16: '>i2', 32: '>i4', 64: '>i8', -32: '>f4', -64: '>f8'} def _makeDecoder(hdr: pyfits.Header) -> Callable[[BinaryIO], NDArray]: """returns a decoder for the rows of FITS primary image data. The decoder is called with an open file and returns the next row. You need to keep track of the total number of rows yourself. """ numType = NUM_CODE[hdr["BITPIX"]] rowLength = hdr["NAXIS1"] nBytes = abs(rowLength*hdr["BITPIX"]//8) bzero, bscale = hdr.get("BZERO", 0), hdr.get("BSCALE", 1) if bzero!=0 or bscale!=1: def read(f: BinaryIO) -> NDArray: return numpy.asarray( numpy.frombuffer(f.read(nBytes), numType, rowLength), 'float32')*bscale+bzero else: def read(f: BinaryIO) -> NDArray: #noflake: conditional definition return numpy.frombuffer(f.read(nBytes), numType, rowLength) return read
[docs]def iterFITSRows(hdr: pyfits.Header, f: BinaryIO ) -> Generator[NDArray, None, None]: """iterates over the rows of a FITS (primary) image. You pass in a FITS header and a file positioned to the start of the image data. What's returned are 1d numpy arrays of the datatype implied by bitpix. The function understands only very basic FITSes (BSCALE and BZERO are known, though, and lead to floats arrays). We do this ourselves since pyfits may pull in the whole thing or at least mmaps it; both are not attractive when I want to stream-process large images. """ decoder = _makeDecoder(hdr) for col in range(hdr["NAXIS2"]): yield decoder(f)
def _iterSetupFast(inFile: BinaryIO, hdr: pyfits.Header ) -> Tuple[pyfits.Header, Generator[NDArray, None, None]]: """helps iterScaledRows for the case of a simple, real-file FITS. """ if hdr is None: hdr = readPrimaryHeaderQuick(inFile) if hdr["NAXIS"]==0: # presumably a compressed FITS return _iterSetupCompatible(inFile, hdr) return hdr, iterFITSRows(hdr, inFile)
[docs]def fixImageExtind(hdus: List, extInd: int) -> int: """returns the index of a compressed image HDU if it immediately follows extInd. If hdus[extInd] is a CompImageHDU itself, this will return extInd unchanged. """ if isinstance(hdus[extInd], pyfits.CompImageHDU): return extInd try: if isinstance(hdus[extInd+1], pyfits.CompImageHDU): return extInd+1 except IndexError: # no extension following, least of all a CompImageHDU. Fall through pass return extInd
def _iterSetupCompatible( inFile: BinaryIO, hdr: pyfits.Header, extInd: int=0 ) -> Tuple[pyfits.Header, Generator[NDArray, None, None]]: """helps iterScaledRows for when _iterSetupFast will not work. Using extInd, you can select a different extension. extInd=0 will automatically select extension 1 if that's a compressed image HDU. """ hdus = pyfits.open(inFile) extInd = fixImageExtind(hdus, extInd) def iterRows(): for row in hdus[extInd].data[:,:]: yield row return hdus[extInd].header, iterRows()
[docs]def iterScaledRows( inFile: BinaryIO, factor: Optional[float]=None, destSize: Optional[int]=None, hdr: Optional[pyfits.Header]=None, slow: bool=False, extInd: int=0) -> Generator[NDArray, None, None]: """iterates over numpy arrays of pixel rows within the open FITS stream inFile scaled by it integer in factor. The arrays are always float32, regardless of the input. When the image size is not a multiple of factor, border pixels are discarded. A FITS header for this data can be obtained using shrinkWCSHeader. You can pass in either a factor the file is to be scaled down, or an approximate size desired. If both are given, factor takes precedence, if none is given, it's an error. If you pass in hdr, it will not be read from inFile; the function then expects the file pointer to point to the start of the first data block. Use this if you've already read the header of a non-seekable FITS. extInd lets you select a different extension. extInd=0 means the first image HDU, which may be in extension 1 for compressed images. iterScaledRows will try to use a hand-hacked interface guaranteed to stream. This only works for plain, 2D-FITSes from real files. iterScaledRows normally notices when it should fall back to pyfits code, but if it doesn't you can pass slow=True. """ if hasattr(inFile, "read") and not slow and extInd==0: hdr, rows = _iterSetupFast(inFile, hdr) else: hdr, rows = _iterSetupCompatible(inFile, hdr, extInd) if factor is None: if destSize is None: raise excs.DataError("iterScaledRows needs either factor or destSize.") size = max(hdr["NAXIS1"], hdr["NAXIS2"]) factor = max(1, size//destSize+1) factor = int(factor) assert factor>0 rowLength = hdr["NAXIS1"] destRowLength = rowLength//factor summedInds = list(range(factor)) for index in range(hdr["NAXIS2"]//factor): newRow = numpy.zeros((rowLength,), 'float32') for i in summedInds: try: newRow += next(rows) except StopIteration: break newRow /= factor # horizontal scaling via reshaping to a matrix and then summing over # its columns... newRow = newRow[:destRowLength*factor] # mypy 1.0.1 type inference bad yield sum(numpy.transpose( # type: ignore (newRow/factor).reshape((destRowLength, factor))))
[docs]def iterScaledBytes( inFileName: Filename, factor: float, extraCards: StrToStrMap={}) -> Generator[bytes, None, None]: """iterates over the bytes for a simple FITS file generated by scaling down the 2D image inFileName by factor. factor must be an integer between 1 and something reasonable. This function acquires the FITS lock itself. """ with fitsLock(): with open(inFileName, "rb") as f: oldHdr = readPrimaryHeaderQuick(f) newHdr = shrinkWCSHeader(oldHdr, factor) newHdr.update(extraCards) yield serializeHeader(newHdr) for row in iterScaledRows(f, factor, hdr=oldHdr): # Unfortunately, numpy's byte swapping for floats is broken in # many wide-spread revisions. So, we cannot do the fast # yield row.newbyteorder(">").tostring() # but rather, for now, have to try the slow: yield struct.pack("!%df"%len(row), *row)
[docs]def headerFromDict(d: StrToStrMap) -> pyfits.Header: """returns a primary header sporting the key/value pairs given in the dictionary d. In all likelihood, this header will *not* work properly as a primary header because, e.g., there are certain rules on the sequence of header items. fitstricks.copyFields can make a valid header out of what's returned here. keys mapped to None are skipped, i.e., you have to do nullvalue handling yourself. """ hdr = pyfits.PrimaryHDU().header for key, value in d.items(): if value is not None: hdr.set(key, value) return hdr
[docs]class WCSAxis(object): """represents a single 1D WCS axis and allows easy metadata discovery. You'll usually use the fromHeader constructor. The idea of using this rather than astropy.wcs or similar is that this is simple and robust. It doesn't know many of the finer points of WCS, though, and in particular it's 1D only. However, for the purposes of cutouts it probably should do for the overwhelming majority of non-spatial FITS axes. The default pixel coordinates are handled in the FITS sense here, i.e., the first pixel has the index 1. Three are methods that have pix0 in their names; these assume 0-based arrays. All the transforms return Nones unchanged. To retrieve the metadata shoved in, use the name, crval, crpix, cdelt, ctype, cunit, and axisLength attributes. """ def __init__(self, name: str, crval: float, crpix: float, cdelt: float, ctype: str="UNKNOWN", cunit: Optional[str]="", axisLength: int=1): assert cdelt!=0 self.crval, self.crpix, self.cdelt = crval, crpix, cdelt self.ctype, self.cunit, self.axisLength = ctype, cunit, axisLength self.name = name
[docs] def pixToPhys(self, pixCoo: float) -> float: """returns the physical value for a 1-based pixel coordinate. """ if pixCoo is None: return None return self.crval+(pixCoo-self.crpix)*self.cdelt
[docs] def pix0ToPhys(self, pix0Coo: float) -> float: """returns the physical value for a 0-based pixel coordinate. """ if pix0Coo is None: return None return self.pixToPhys(pix0Coo+1)
[docs] def physToPix(self, physCoo: float) -> float: """returns a 1-based pixel coordinate for a physical value. """ if physCoo is None: return None return (physCoo-self.crval)/self.cdelt+self.crpix
[docs] def physToPix0(self, physCoo: float) -> float: """returns a 0-based pixel coordinate for a physical value. """ if physCoo is None: return None return self.physToPix(physCoo)-1
[docs] def getLimits(self) -> Tuple[float, float]: """returns the minimal and maximal physical values this axis takes within the image. """ limits = self.pixToPhys(1), self.pixToPhys(self.axisLength) return min(limits), max(limits)
[docs] @classmethod def fromHeader(cls, header: pyfits.Header, axisIndex: int, forceSeparable: bool=False) -> 'WCSAxis': """returns a WCSAxis for the specified axis in header. If the axis is mentioned in a transformation matrix (CD or PC), a ValueError is raised; this is strictly for 1D coordinates. The axisIndex is 1-based; to get a transform for the axis described by CTYPE1, pass 1 here. """ if ("CD%d_%d"%(axisIndex, axisIndex) in header or "PC%d_%d"%(axisIndex, axisIndex) in header): if not forceSeparable: raise ValueError("FITS axis %s appears not separable. WCSAxis" " cannot handle this."%axisIndex) def get(key, default): return header.get("%s%d"%(key, axisIndex), default) guessedName = get("CNAME", "").strip() if not guessedName: guessedName = get("CTYPE", "").split("-", 1)[0].strip() if not guessedName or guessedName=="UNKNOWN": guessedName = "COO" guessedName = "%s_%d"%(guessedName, axisIndex) return cls(guessedName, get("CRVAL", 0), get("CRPIX", 0), get("CDELT", 1), get("CTYPE", "UNKNOWN").strip(), get("CUNIT", "").strip() or None, get("NAXIS", 1))
# WCSAxis.fromHeader wrapper for rmkfuncs
[docs]@codetricks.document def getWCSAxis( header: pyfits.Header, axisIndex: int, forceSeparable: bool=False) -> WCSAxis: """returns a WCSAxis instance from an axis index and a FITS header. If the axis is mentioned in a transformation matrix (CD or PC), a ``ValueError`` is raised (use ``forceSeparable`` to override). The ``axisIndex`` is 1-based; to get a transform for the axis described by CTYPE1, pass 1 here. The object returned has methods like ``pixToPhys``, ``physToPix`` (and their ``pix0`` brethren), and ``getLimits``. Note that at this point WCSAxis only supports linear transforms (it's a DaCHS-specific implementation). We'll extend it on request. """ return WCSAxis.fromHeader(header, axisIndex, forceSeparable)
[docs]class ESODescriptorsError(excs.SourceParseError): """is raised when something goes wrong while parsing ESO descriptors. """
def _makeNamed(name: str, re: str) -> str: # A helper to construct REs for the ESO parser. return "(?P<%s>%s)"%(name, re) def _makeTypeScanner( typeName: str, literalRE: re.Pattern, constructor: Callable[[str], Any]): """returns a scanner method for the _ESODescriptorsParser. This is a helper method for building the class; typeName must be the same as the name in _scan_name. constructor is a function that turns string literals into objects of the desired type. """ def scanner(self) -> str: mat = literalRE.match(self.data, self.curPos) if not mat: raise ValueError("Expected a %s here"%typeName) self.curPos = mat.end() self.curCol.append(constructor(mat.group())) self.yetToRead -= 1 if self.yetToRead==0: return "harvest" else: return typeName return scanner class _ESODescriptorsParser: """an ad-hoc parser for ESO's descriptors. These are sometimes in FITS files produced by MIDAS. What I'm pretending to parse here is the contatenation of the cards' values without the boundaries. The parse is happening at construction time, after which you fetch the result in the result attribute. I'm adhoccing this. If someone digs up the docs on what the actual grammar is, I'll do it properly. """ stringPat = "'[^']*'" intPat = r"-?\d+" floatPat = r"-?\d+\.\d+E[+-]\d+" white = r"\s+" nextToken = r"\s*" headerSep = nextToken+','+nextToken headerRE = re.compile(nextToken +_makeNamed("colName", stringPat)+headerSep +_makeNamed("typeCode", stringPat)+headerSep +_makeNamed("startInd", intPat)+headerSep +_makeNamed("endInd", intPat)+headerSep +stringPat+headerSep +stringPat+headerSep +stringPat) floatRE = re.compile(nextToken+floatPat) integerRE = re.compile(nextToken+intPat) def __init__(self, data: str): self.data, state = data, "header" self.result: Dict[str, List[float]] = {} self.curPos:int = 0 while state!="end": try: state = getattr(self, "_scan_"+state)() except Exception as msg: raise ESODescriptorsError(str(msg), location="character %s"%self.curPos, offending=repr(self.data[self.curPos:self.curPos+20])) @classmethod def fromFITSHeader(cls, hdr: pyfits.Header) -> '_ESODescriptorsParser': """returns a parser from the data in the pyfits hdr. """ descLines, collecting = [], False for card in hdr.cards: if card.keyword=="HISTORY": if " ESO-DESCRIPTORS END" in card.value: collecting = False if collecting: descLines.append(card.value) if " ESO-DESCRIPTORS START" in card.value: collecting = True return cls("\n".join(descLines)) def _scan_header(self) -> str: """reads the next descriptor header and returns a type name for it. """ mat = self.headerRE.match(self.data, self.curPos) if not mat: if not self.data[self.curPos:].strip(): return "end" else: raise ValueError("Could not find next header") self.curPos = mat.end() self.curCol: List[float] = [] self.curColName = mat.group("colName")[1:-1] self.yetToRead = int(mat.group("endInd"))-int(mat.group("startInd"))+1 if mat.group("typeCode")[1:-1].startswith("R"): return "float" elif mat.group("typeCode")[1:-1].startswith("I"): return "integer" else: raise ValueError("Unknown type code %s"%mat.group("typeCode")) def _scan_harvest(self): """enter a parsed column into result and prepare for the next column. """ self.result[self.curColName] = self.curCol del self.curCol del self.curColName del self.yetToRead return "header" _scan_float = _makeTypeScanner("float", floatRE, float) _scan_integer = _makeTypeScanner("integer", integerRE, int) del _makeTypeScanner
[docs]def parseESODescriptors(hdr: pyfits.Header) -> Dict[str, List[float]]: """returns parsed ESO descriptors from a pyfits header hdr. ESO descriptors are data columns stuck into FITS history lines. They were produced by MIDAS. This implementation was made without actual documentation, is largely based on conjecture, and is certainly incomplete. What's returned is a dictionary mapping column keywords to lists of column values. """ return _ESODescriptorsParser.fromFITSHeader(hdr).result
if __name__=="__main__": # pragma: no cover import doctest doctest.testmod()