Source code for gavo.formats.texttable

"""
Writing data as plain text.

Currently, we only do TSV.  It would probably be nice to support "formatted
ASCII as well, though that may be a bit tricky given that we do not
really store sane formatting hints for most columns.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import io
import datetime

from gavo import base
from gavo import rsc
from gavo import stc
from gavo import utils
from gavo.formats import common
from gavo.utils import serializers

# A mapper function registry for formats directed at humans
displayMFRegistry = serializers.ValueMapperFactoryRegistry()
registerDisplayMF = displayMFRegistry.registerFactory

def _defaultMapperFactory(colDesc):
	def coder(val):
		if val is None:
			return "N/A"
		return str(val)
	return coder
registerDisplayMF(_defaultMapperFactory)


floatTypes = set(["real", "float", "double", "double precision"])

def _sfMapperFactory(colDesc):
	if colDesc["dbtype"] not in floatTypes:
		return
	if colDesc["displayHint"].get("sf"):
		fmtStr = "%%.%df"%int(colDesc["displayHint"].get("sf"))
		def coder(val):
			if val is None:
				return "N/A"
			else:
				return fmtStr%val
		return coder
registerDisplayMF(_sfMapperFactory)


def _hmsMapperFactory(colDesc):
	if colDesc["displayHint"].get("type")!="hms":
		return
	assert colDesc["unit"]=="deg", "hms display hint only works with deg so far"
	colDesc["unit"] = "h:m:s"
	sepChar = colDesc["displayHint"].get("sepChar", " ")
	sf = int(colDesc["displayHint"].get("sf", 2))
	def coder(val):
		if val is None:
			return "N/A"
		else:
			return utils.degToHms(val, sepChar, sf)
	return coder
registerDisplayMF(_hmsMapperFactory)


def _dmsMapperFactory(colDesc):
	if colDesc["displayHint"].get("type")!="dms":
		return
	colDesc["unit"] = "d:m:s"
	sepChar = colDesc["displayHint"].get("sepChar", " ")
	sf = int(colDesc["displayHint"].get("sf", 2))
	def coder(val):
		if val is None:
			return "N/A"
		return utils.degToDms(val, sepChar, sf)
	return coder
registerDisplayMF(_dmsMapperFactory)


def _specUnitMapperFactory(colDesc):
	"""returns a factory that converts between spectral units based
	on a spectralUnit displayHint.

	In contract to unitMapperFactory, this supports non-linear conversions.
	"""
	spUnit = colDesc["displayHint"].get("spectralUnit")
	if spUnit and spUnit!=colDesc["unit"]:
		fmtStr = "'%%.%df'"%int(colDesc["displayHint"].get("sf", 2))
		expr = base.getSpecExpr(colDesc["unit"], spUnit).format("val")
		code = ("def coder(val):\n  if val is None:\n    return 'N/A'\n"
			f"  else:\n    return {fmtStr}%({expr})")
		colDesc["unit"] = spUnit
		return utils.compileFunction(code, "coder")


registerDisplayMF(_specUnitMapperFactory)


def _unitMapperFactory(colDesc):
	"""returns a factory that converts between units for fields that have
	a displayUnit displayHint.

	The stuff done here has to be done for all factories handling unit-based
	floating point values.  Maybe we want to do "decorating" meta-factories?
	"""
	if colDesc["displayHint"].get("displayUnit") and \
			colDesc["displayHint"]["displayUnit"]!=colDesc["unit"]:
		try:
			factor = base.computeConversionFactor(colDesc["unit"],
				colDesc["displayHint"]["displayUnit"])
		except base.BadUnit:
			# bad unit somewhere; ignore display hint
			base.ui.notifyError("Bad unit while computing conversion factor.")
			return None

		colDesc["unit"] = colDesc["displayHint"]["displayUnit"]
		fmtStr = "%%.%df"%int(colDesc["displayHint"].get("sf", 2))
		
		if "[" in colDesc["dbtype"]:
			def coder(val):
				if val is None:
					return "N/A"
				return "[%s]"%", ".join("N/A" if item is None else fmtStr%(item*factor)
					for item in val)

		else:
			def coder(val):
				return "N/A" if val is None else fmtStr%(val*factor)

		return coder
registerDisplayMF(_unitMapperFactory)


def _stringWrapMF(baseMF):
	"""returns a factory that that stringifies floats and makes N/A from
	Nones coming out of baseMF and passes everything else through.
	"""
	def factory(colDesc):
		handler = baseMF(colDesc)
		if colDesc["displayHint"].get("sf", None):
			fmtstr = "%%.%df"%int(colDesc["displayHint"]["sf"])
		fmtstr = "%s"
		if handler:
			def realHandler(val):
				res = handler(val)
				if isinstance(res, float):
					return fmtstr%res
				else:
					if res is None:
						return "N/A"
					else:
						return res
			return realHandler
	return factory

registerDisplayMF(_stringWrapMF(stc.datetimeMapperFactory))


[docs]def humanDatesFactory(colDesc): format, unit = {"humanDate": ("%Y-%m-%d %H:%M:%S", ""), "humanDay": ("%Y-%m-%d", "") }.get( colDesc["displayHint"].get("type"), (None, None)) if format and colDesc["dbtype"] in ("date", "timestamp"): colDesc["unit"] = unit def coder(val): if val is None: return "N/A" else: colDesc["datatype"], colDesc["arraysize"] = "char", "*" colDesc["xtype"] = "timestamp" colDesc["unit"] = "" try: return val.strftime(format) except ValueError: # probably too old a date, fall back to a hack return val.isoformat() return coder
registerDisplayMF(humanDatesFactory)
[docs]def humanTimesFactory(colDesc): if colDesc["displayHint"].get("type")=="humanTime": sf = int(colDesc["displayHint"].get("sf", 0)) fmtStr = "%%02d:%%02d:%%0%d.%df"%(sf+3, sf) def coder(val): if val is None: return "N/A" else: if isinstance(val, (datetime.time, datetime.datetime)): return fmtStr%(val.hour, val.minute, val.second) elif isinstance(val, datetime.timedelta): hours = val.seconds//3600 minutes = (val.seconds-hours*3600)//60 seconds = (val.seconds-hours*3600-minutes*60)+val.microseconds/1e6 return fmtStr%(hours, minutes, seconds) return coder
registerDisplayMF(humanTimesFactory)
[docs]def jdMapperFactory(colDesc): """maps JD, MJD, unix timestamp, and julian year columns to human-readable datetimes. """ if (colDesc["displayHint"].get("type")=="humanDate" and colDesc["dbtype"] in ("double precision", "real")): if colDesc["unit"]=="d": if stc.isMJD(colDesc.original): converter = stc.mjdToDateTime else: converter = stc.jdnToDateTime elif colDesc["unit"]=="s": converter = datetime.datetime.utcfromtimestamp elif colDesc["unit"]=="yr": converter = stc.jYearToDateTime else: return None def fun(val): if val is None: return "N/A" return utils.formatISODT(converter(val)) colDesc["datatype"], colDesc["arraysize"] = "char", "*" colDesc["xtype"] = "timestamp" colDesc["unit"] = "" return fun
registerDisplayMF(jdMapperFactory) def _sizeMapperFactory(colDesc): """is a factory for formatters for file sizes and similar. """ if colDesc["unit"]!="byte": return sf = int(colDesc["displayHint"].get("sf", 1)) def coder(val): if val is None: return "N/A" else: return utils.formatSize(val, sf) return coder registerDisplayMF(_sizeMapperFactory) registerDisplayMF(serializers._pgSphereMapperFactory) def _makeString(val): if val is None: return "N/A" elif isinstance(val, str): # a cheaty way of making sure we get hex escapes for non-printable stuff return repr(val)[1:-1] elif isinstance(val, bytes): return repr(val)[2:-1] elif isinstance(val, (list, tuple)): return "[%s]"%" ".join("%s"%v for v in val) return str(val)
[docs]def renderAsColumns(table, target, acquireSamples=False): """writes a fixed-column representation of table to target. """ if isinstance(table, rsc.Data): table = table.getPrimaryTable() sm = base.SerManager(table, acquireSamples=acquireSamples, mfRegistry=displayMFRegistry) target.write(utils.bytify( utils.formatSimpleTable( (_makeString(s) for s in row) for row in sm.getMappedTuples())))
[docs]def renderAsText(table, target, acquireSamples=True): """writes a text (TSV) rendering of table to the file target. """ if isinstance(table, rsc.Data): table = table.getPrimaryTable() sm = base.SerManager(table, acquireSamples=acquireSamples) for row in sm.getMappedTuples(): target.write( utils.bytify("\t".join([_makeString(s) for s in row])+"\n"))
[docs]def getAsText(data): target = io.BytesIO() renderAsText(data, target) return target.getvalue().decode("utf-8")
[docs]def readTSV(inFile): """returns a list of tuples for a tab-separated-values file. Lines starting with # and lines containing only whitespace are ignored. Whitespace at front and back is stripped. No checks are done at this point, i.e., the tuples could be of varying lengths. """ data = [] for ln in inFile: ln = ln.strip() if not ln or ln.startswith("#"): continue data.append(tuple(ln.split("\t"))) return data
# NOTE: This will only serialize the primary table. common.registerDataWriter("tsv", renderAsText, "text/tab-separated-values", "Tab separated values", ".tsv") common.registerDataWriter("txt", renderAsColumns, "text/plain", "Fixed-column plain text", ".txt")