Source code for gavo.formats.fitstable

"""
Writing data in FITS binary tables
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import os
import tempfile
import time

import numpy

from gavo import base
from gavo import rsc
from gavo import utils
from gavo.formats import common
from gavo.utils import pyfits
from gavo.votable import common as votcommon


_fitsCodeMap = {
	"short": "I",
	"int": "J",
	"long": "K",
	"float": "E",
	"double": "D",
	"boolean": "L",
	"char": "A",
	"unicodeChar": "A",
}

_typeConstructors = {
	"short": int,
	"int": int,
	"long": int,
	"float": float,
	"double": float,
	"boolean": int,
	"char": str,
	"unicodeChar": str,
}


def _makeCharArray(values, colInd, colDesc):
	"""returns a pyfits-capable column array for strings stored in the
	colInd-th column of values.

	This is for char-valued columns, i.e., ASCII-only.
	"""
	arr = numpy.array([(v[colInd] or "").encode("ascii", "replace")
		for v in values],
		dtype=numpy.bytes_)
	return "%dA"%arr.itemsize, arr


def _makeStringArray(values, colInd, colDesc):
	"""returns a pyfits-capable column array for strings stored in the
	colInd-th column of values.

	This is for unicodeChar-valued columns; we'd like to do something
	sensible with them; this implementation is FIXME, because we're
	essentially discaring all non-ASCII and write a normal char column.
	"""
# Yeah, right now this is identical to _makeCharArray, but that's FIXME.
	arr = numpy.array([(v[colInd] or "").encode("ascii", "replace")
		for v in values],
		dtype=numpy.bytes_)
	return "%dA"%arr.itemsize, arr


def _getNullValue(colDesc):
	"""returns a null value we consider ok for a column described by colDesc.

	This is mainly intended for array serialisation.  It is also
	used for column construction for integer types.
	"""
	length = votcommon.getLength(colDesc["arraysize"])
	if length and length>1:
		# TODO: figure out nullvalue interpretation in FITS arrays.
		return None

	nullValue = colDesc["nullvalue"]
	if nullValue is None:
		# enter some reasonable defaults
		if (colDesc["datatype"]=="float"
			or colDesc["datatype"]=="double"):
			nullValue = float("NaN")

		elif colDesc["datatype"]=="text":
			nullValue = ""

	else:
		nullValue = _typeConstructors[colDesc["datatype"]](nullValue)
	
	return nullValue


def _computeTypeCode(colDesc):
	"""returns a FITS type code suitable for colDesc.
	"""
	baseCode = _fitsCodeMap[colDesc["datatype"]]
	length = votcommon.getLength(colDesc["arraysize"])

	if length==1:
		return length, baseCode

	elif length is None:
		# a variable-length array (other than a string).  We, in particular,
		# need this for polygons.
		return length, "P%s()"%(baseCode)

	else:
		return length, "%d%s"%(length, baseCode)


def _makeValueArray(values, colInd, colDesc):
	"""returns a pyfits-capable column array for non-string values
	stored in the colInd-th column of values.
	"""
	length, typecode = _computeTypeCode(colDesc)
	nan = float("NaN")

	if length is None:
		# That's a variable-length array that we'll fill from a
		# numpy object array
		return typecode, numpy.array([list(v[colInd]) for v in values],
			dtype=numpy.object)

	elif length>1:
		# numpy 1:1.12.1-3 and pyfits from astropy 1.3-8 apparently
		# have trouble with masked arrays here, so we exploit
		# that in the VO and for floats NaN and NULL is treated equivalently.
		nullArray = numpy.array([nan]*length)
		def mkval(v):
			if v is None:
				return nullArray
			else:
				return numpy.array(v)

	else:
		def mkval(v, nullValue=_getNullValue(colDesc)):
			if v is None:
				if nullValue is None:
					raise ValueError("While serializing a FITS table: NULL"
						" detected in column '%s' but no null value declared"%
						colDesc["name"])
				return nullValue
			else:
				return v

	arr = numpy.array([mkval(v[colInd]) for v in values])
	return typecode, arr


def _makeExtension(serMan):
	"""returns a pyfits hdu for the valuemappers.SerManager instance table.
	"""
	values = list(serMan.getMappedTuples())
	columns = []
	utypes = []
	descriptions = []

	for colInd, colDesc in enumerate(serMan):
		descriptions.append(utils.defuseFileName(colDesc["description"], False))
		if colDesc["datatype"]=="char":
			makeArray = _makeCharArray
		elif colDesc["datatype"]=="unicodeChar":
			makeArray = _makeStringArray
		else:
			makeArray = _makeValueArray
		
		typecode, arr = makeArray(values, colInd, colDesc)
		# only pass on null values to integral types; FITS doesn't support
		# anything else, and I'll just leave Nones in non-integral columns
		# to astropy.io.fits -- with all the mess involved, in particular
		# that \0 and NULL or NaN and NULL can't be told apart in FITS.
		nullValue = None
		if typecode in 'BIJK':
			nullValue = _getNullValue(colDesc)
		
		columns.append(pyfits.Column(name=str(colDesc["name"]),
			unit=str(colDesc["unit"]), format=typecode,
			null=nullValue, array=arr))
		if colDesc["utype"]:
			utypes.append((colInd, str(colDesc["utype"].lower())))

	hdu = pyfits.BinTableHDU.from_columns(
		pyfits.ColDefs(columns))
	for colInd, utype in utypes:
		hdu.header.set("TUTYP%d"%(colInd+1), utype)
	
	cards = hdu.header.cards
	for colInd, desc in enumerate(descriptions):
		cards["TTYPE%d"%(colInd+1)].comment = desc
		hdu.header.set("TCOMM%d"%(colInd+1), desc)

	if not hasattr(serMan.table, "IgnoreTableParams"):
		for param in serMan.table.iterParams():
			if param.value is None:
				continue
		
			key, value, comment = str(param.name), param.value, param.description
			if isinstance(value, str):
				value = utils.defuseFileName(value, False)
			if isinstance(comment, str):
				comment = utils.defuseFileName(comment, False)
			if len(key)>8:
				key = "hierarch "+key

			try:
				hdu.header.set(key, value=value, comment=comment)
			except ValueError as ex:
				# do not fail just because some header couldn't be serialised
				base.ui.notifyWarning(
					"Failed to serialise param %s to a FITS header (%s)"%(
						param.name,
						utils.safe_str(ex)))

	return hdu
	

def _makeFITSTableNOLOCK(dataSet, acquireSamples=True):
	"""returns a hdulist containing extensions for the tables in dataSet.

	You must make sure that this function is only executed once
	since pyfits is not thread-safe.
	"""
	tables = [base.SerManager(table, acquireSamples=acquireSamples)
		for table in list(dataSet.tables.values())]
	extensions = [_makeExtension(table) for table in tables]
	primary = pyfits.PrimaryHDU()
	primary.header.set("DATE", time.strftime("%Y-%m-%d"),
		"Date file was written")
	return pyfits.HDUList([primary]+extensions)


[docs]def makeFITSTable(dataSet, acquireSamples=False): """returns a hdulist containing extensions for the tables in dataSet. This function may block basically forever. Never call this from the main server, always use threads or separate processes (until pyfits is fixed to be thread-safe). This will add table parameters as header cards on the resulting FITS header. """ with utils.fitsLock(): return _makeFITSTableNOLOCK(dataSet, acquireSamples)
[docs]def writeFITSTableFile(hdulist): """returns the name of a temporary file containing the FITS data for hdulist. """ handle, pathname = tempfile.mkstemp(".fits", prefix="fitstable", dir=base.getConfig("tempDir")) # if there's more than the primary HDU, EXTEND=True is mandatory; let's # be defensive here if len(hdulist)>1: hdulist[0].header.set("EXTEND", True, "More exts following") with utils.silence(): hdulist.writeto(pathname, overwrite=True) os.close(handle) return pathname
[docs]def makeFITSTableFile(dataSet, acquireSamples=True): """returns the name of a temporary file containing a fits file representing dataSet. The caller is responsible to remove the file. """ hdulist = makeFITSTable(dataSet, acquireSamples) return writeFITSTableFile(hdulist)
[docs]def writeDataAsFITS(data, outputFile, acquireSamples=False): """a formats.common compliant data writer. This will write out table params as header cards. To serialise those yourself (as is required for spectral data model compliant tables), set an attribute IgnoreTableParams (with an arbitrary value) on the table. """ data = rsc.wrapTable(data) fitsName = makeFITSTableFile(data, acquireSamples) try: src = open(fitsName, "rb") utils.cat(src, outputFile) src.close() finally: os.unlink(fitsName)
common.registerDataWriter("fits", writeDataAsFITS, "application/fits", "FITS Binary Table", ".fits")