Source code for gavo.utils.pgsphere

"""
Bindings for the pgsphere library and psycopg2.

Basically, once per program run, you need to call preparePgSphere(connection),
and you're done.

All native representation is in rad.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

from __future__ import annotations

import io
import math
import re
import tempfile

from astropy import units as u  # type: ignore
import numpy

from gavo.utils import codetricks
from gavo.utils import excs
from gavo.utils import mathtricks
from gavo.utils import misctricks
from gavo.utils import texttricks
from gavo.utils.fitstools import pyfits
from gavo.utils.mathtricks import DEG

from gavo.utils.dachstypes import (Any, Dict, List, Optional, Sequence,
	Union, Tuple, TYPE_CHECKING, cast)
if TYPE_CHECKING:
	Component = Union[float, u.Quantity]
	from astropy.coordinates import ( # type: ignore
		UnitSphericalRepresentation)
	


_TRAILING_ZEROES = re.compile("0+(\s|$)")
[docs]def removeTrailingZeroes(s: str) -> str: """remove zeroes in front of whitespace or the string end. This is used for cosmetics in STC-S strings. >>> removeTrailingZeroes("23.3420 21.2 12.00000") '23.342 21.2 12.' """ return _TRAILING_ZEROES.sub(r"\1", s)
[docs]class PgSAdapter(object): """A base class for objects adapting pgSphere objects. The all need a pgType attribute and two static methods ``_adaptToPgSphere(obj)`` and ``_castFromPgSphere(value, cursor)``. You must also define a sequence checkedAttributes; all attributes mentioned there must be equal for two adapted values to be equal (equality testing here really is mainly for unit tests with hand-crafted values). Also, all subclasses you should provide an asPoly returning a spherical polygon. This is used when uploading VOTables with REGION columns. In practice, you also want: :fromDALI(cls, daliSeq): a classmethod turning the DALI votable representation (typically, float arrays in degrees) into a corresponding object. :asDALI(self): the inverse of fromDALI :asSODA(self): returns a representation of the geometry as used in SODA parameters. """ pgType: Optional[str] = None checkedAttributes: List[str] = [] def __eq__(self, other: Any): if self.__class__!=other.__class__: return False for attName in self.checkedAttributes: if getattr(self, attName)!=getattr(other, attName): return False return True def __ne__(self, other: Any): return not self==other def __repr__(self): return "<pgsphere %s>"%self.asSTCS("Unknown")
[docs] def asPoly(self): raise ValueError("%s objects cannot be turned into polygons."% self.__class__)
[docs] def asSODA(self): """returns the "SODA-form" for this circle. This is a string containing blank-separated float literals; the floats are what asDALI returns, and hence this is built on top of asDALI. We don't worry about the fact that the coordinates just *might* be non-ICRS. """ return removeTrailingZeroes( " ".join("%.10f"%v for v in self.asDALI()))
[docs]class SPoint(PgSAdapter): """A point on a sphere from pgSphere. >>> SPoint(1, 1).asSTCS("ICRS") 'Position ICRS 57.2957795131 57.2957795131' >>> SPoint.fromDegrees(1, -1).asSTCS("ICRS") 'Position ICRS 1. -1.' """ pgType = "spoint" checkedAttributes = ["x", "y"] pattern = re.compile(r"\s*\(\s*([0-9.eNa-]+)\s*,\s*([0-9.eNa-]+)\s*\)") def __init__(self, x: Component, y: Component): if isinstance(x, u.Quantity) and isinstance(y, u.Quantity): self.x, self.y = x.to(u.rad).value, y.to(u.rad).value else: self.x, self.y = float(x), float(y) def __repr__(self) -> str: return "SPoint(%r, %r)"%(self.x, self.y) @staticmethod def _adaptToPgSphere(spoint: SPoint) -> AsIs: return AsIs("spoint '(%.10f,%.10f)'"%(spoint.x, spoint.y)) @classmethod def _castFromPgSphere(cls, value: Optional[str], cursor: Any ) -> 'Optional[SPoint]': if value is not None: return cls(*[float(v) for v in cls.pattern.match(value).groups()]) # type: ignore return None
[docs] @classmethod def fromDegreesP(cls, x: Component, y: Component) -> SPoint: """fromDegrees when it's certain that there's no None x and y. """ return cls(x*DEG, y*DEG)
[docs] @classmethod def fromDegrees(cls, x: Optional[Component], y: Optional[Component] ) -> Optional[SPoint]: if x is None or y is None: return None return cls.fromDegreesP(x, y)
[docs] @classmethod def fromUnitVector(cls, uvec: UnitSphericalRepresentation) -> SPoint: """returns an SPoint for a 3-unit sphere vector. """ return cls(*mathtricks.cartToSpher(uvec))
[docs] def asCooPair(self) -> Tuple[float, float]: """returns this point as (long, lat) in degrees. """ return (self.x/DEG, self.y/DEG)
[docs] def asSTCS(self, systemString: str) -> str: return removeTrailingZeroes( "Position %s %.10f %.10f"%(systemString, self.x/DEG, self.y/DEG))
[docs] def asPgSphere(self) -> str: return "spoint '(%.10f,%.10f)'"%(self.x, self.y)
[docs] def p(self) -> str: # helps below return "(%r, %r)"%(self.x, self.y)
[docs] def asUnitSphereVec(self) -> UnitSphericalRepresentation: """returns self as a triple of cx, cy, cz on the unit sphere. """ return mathtricks.spherToCart(self.x, self.y)
[docs] def asDALI(self) -> Tuple[float, float]: return self.asCooPair()
[docs] @classmethod def fromDALI(cls, coos: Sequence[Component]): return cls.fromDegrees(*coos)
[docs]class SCircle(PgSAdapter): """A spherical circle from pgSphere. The constructor accepts an SPoint center and a radius in rad. """ pgType = "scircle" checkedAttributes = ["center", "radius"] pattern = re.compile("<(\([^)]*\))\s*,\s*([0-9.e-]+)>") def __init__(self, center: SPoint, radius: Component): self.center, self.radius = center, float(radius) @staticmethod def _adaptToPgSphere(sc: SCircle): return AsIs("scircle '< %s, %r >'"%(sc.center.p(), sc.radius)) @classmethod def _castFromPgSphere(cls, value: Optional[str], cursor: Any ) -> Optional[SCircle]: if value is not None: pt, radius = cls.pattern.match(value).groups() # type: ignore if pt is not None: return cls(SPoint._castFromPgSphere(pt, cursor), radius) # type: ignore return None
[docs] def asDALI(self) -> Tuple[float, float, float]: """returns the "DALI-form" for this circle. This is an array containing the center coordinates and the radius in degrees. """ return (self.center.x/DEG, self.center.y/DEG, self.radius/DEG)
[docs] @classmethod def fromDALI(cls, daliSeq: Sequence[Optional[Component]] ) -> Optional[SCircle]: """returns a circle from its DALI float sequence. Any None in daliSeq makes this a None to help with VOTable null value handling. """ if None in daliSeq: return None ra, dec, radius = [float(s) for s in daliSeq] # type: ignore center = SPoint.fromDegreesP(ra, dec) return cls(center, radius*DEG)
[docs] @classmethod def fromPointSet(cls, points: Sequence[SPoint]) -> SCircle: """returns an SCircle covering all the SPoints in points. (max radius: pi/2). """ uvecs = [numpy.array(p.asUnitSphereVec()) for p in points] center = sum(uvecs)/len(uvecs) radius = max(center.dot(v) for v in uvecs) # type: ignore return cls( SPoint.fromUnitVector(center), math.acos(radius))
[docs] def asSTCS(self, systemString: str) -> str: return removeTrailingZeroes("Circle %s %s"%( systemString, self.asSODA()))
[docs] def asPgSphere(self) -> str: return "scircle '< (%.10f, %.10f), %.10f >'"%( self.center.x, self.center.y, self.radius)
[docs] def asPoly(self, nSegments: int=32) -> SPoly: # approximate the circle with 32 line segments and don't worry about # circles with radii larger than 90 degrees. # We compute the circle around the north pole and then rotate # the resulting points such that the center ends up at the # circle's center. r = math.sin(self.radius) innerOffset = math.cos(self.radius) rotationMatrix = mathtricks.getRotZ(math.pi/2-self.center.x )@mathtricks.getRotX(math.pi/2-self.center.y) points = [] for i in range(nSegments): angle = 2.*i*math.pi/nSegments dx, dy = r*math.sin(angle), r*math.cos(angle) points.append(SPoint( *mathtricks.cartToSpher( rotationMatrix@numpy.array([dx, dy, innerOffset])))) return SPoly(points)
[docs] def asSMoc(self, order: int=6, inclusive: bool=False): """returns an SMoc instance for this circle. order is the maximum order of the moc returned. """ import healpy # type: ignore pixels = healpy.query_disc(vec=self.center.asUnitSphereVec(), radius=self.radius, nside=2**order, nest=True, inclusive=inclusive) return SMoc.fromCells(order, pixels, maxOrder=order)
[docs]class SPoly(PgSAdapter): """A spherical polygon from pgSphere. The constructor accepts a list points of SPoints. """ pgType = "spoly" checkedAttributes = ["points"] pattern = re.compile("\([^)]+\)") def __init__(self, points: Sequence[SPoint]): self.points = tuple(points) @staticmethod def _adaptToPgSphere(spoly: SPoly) -> AsIs: return AsIs("spoly '{%s}'"%(", ".join(p.p() for p in spoly.points))) @classmethod def _castFromPgSphere(cls, value: Optional[str], cursor: Any ) -> Optional[SPoly]: if value is not None: return cls([SPoint._castFromPgSphere(ptLit, cursor) # type: ignore for ptLit in cls.pattern.findall(value)]) return None
[docs] def asDALI(self) -> List[float]: """returns the DALI form of this polygon. This is a sequence of floats of the vertex coordinates in degrees. """ res = [] for p in self.points: res.extend([p.x/DEG, p.y/DEG]) return res
[docs] @classmethod def fromDALI(cls, daliSeq: List[Component]) -> Optional[SPoly]: """returns a polygon from a DALI float-sequence This returns None for empty daliSeqs to help with VOTable NULL value handling. """ if not daliSeq: return None if len(daliSeq)<6 or len(daliSeq)%2: raise ValueError("Need an even-numbered number (>=6) of floats" " in a DALI polygon representation, got %s floats."%len(daliSeq)) return cls([SPoint.fromDegreesP(*tuple(p)) for p in misctricks.grouped(2, daliSeq)])
[docs] def asCooPairs(self) -> List[Tuple[float, float]]: """returns the vertices as a sequence of (long, lat) pairs in degrees. This form is required by some functions from base.coords. """ return [p.asCooPair() for p in self.points]
[docs] def asSTCS(self, systemString: str) -> str: return removeTrailingZeroes("Polygon %s %s"%(systemString, self.asSODA()))
[docs] def asPgSphere(self) -> str: return "spoly '{%s}'"%(",".join("(%.10f,%.10f)"%(p.x, p.y) for p in self.points))
[docs] def asPoly(self) -> SPoly: return self
[docs] def asSMoc(self, order: int=6, inclusive: bool=False) -> SMoc: """returns an SMoc instance for this polygon. order is the maximum order of the moc returned. """ import healpy vertices = [p.asUnitSphereVec() for p in self.points] pixels = healpy.query_polygon(vertices=vertices, nside=2**order, nest=True, inclusive=inclusive) return SMoc.fromCells(order, pixels, maxOrder=order)
[docs] def getCenter(self) -> List[float]: # This probably should return an SPoint, but let's first see where # this is used. """returns an estimate for some sort of center of this polygon. This is computed as the mean of the vertices (in unit sphere representation), which, depending on the polygon, may be very far from the polygon's center of mass. Also, you can kill this by having all points sit on a great circle, etc. So: handle with care. Doing this properly is hard. """ vertices = [numpy.array(mathtricks.spherToCart(p.x, p.y)) for p in self.points] center = numpy.average(vertices, axis=0) center = center/(center@center) return mathtricks.cartToSpher(center)
[docs]class SBox(PgSAdapter): """A spherical box from pgSphere. The constructor accepts the two corner points. """ pgType = "sbox" checkedAttributes = ["corner1", "corner2"] pattern = re.compile("\([^()]+\)") def __init__(self, corner1: SPoint, corner2: SPoint): self.corner1, self.corner2 = corner1, corner2 @staticmethod def _adaptToPgSphere(sbox: SBox) -> AsIs: return AsIs("sbox '(%s, %s)'"%(sbox.corner1.p(), sbox.corner2.p())) @classmethod def _castFromPgSphere(cls, value: Optional[str], cursor: Any ) -> Optional[SBox]: if value is not None: return cls(*[SPoint._castFromPgSphere(ptLit, cursor) # type: ignore for ptLit in cls.pattern.findall(value)]) return None
[docs] @classmethod def fromSIAPPars(cls, ra: float, dec: float, raSize: float, decSize: float ) -> SBox: """returns an SBox corresponding to what SIAP passes in. In particular, all values are in degrees, and a cartesian projection is assumed. This is for use with SIAP and tries to partially implement that silly prescription of "folding" over at the poles. If that happens, a TwoSBoxes exception is raised. It contains two SBoxes that should be ORed. I agree that sucks. Let's fix SIAP. """ if 90-abs(dec)<0.1: # Special handling at the pole raSize = 360 else: raSize = raSize/math.cos(dec*DEG) decSize = abs(decSize) # inhibit auto swapping of points minRA, maxRA = ra-raSize/2., ra+raSize/2. bottom, top = dec-decSize/2., dec+decSize/2. # folding over at the poles: raise an exception with two boxes, # and let upstream handle it. Foldover on both poles is not supported. # All this isn't really thought out and probably doesn't work in # many interesting cases. # I hate that folding over. if bottom<-90 and top>90: raise ValueError("Cannot fold over at both poles") elif bottom<-90: raise TwoSBoxes( cls( SPoint.fromDegreesP(minRA, -90), SPoint.fromDegreesP(maxRA, top)), cls( SPoint.fromDegreesP(180+minRA, -90), SPoint.fromDegreesP(180+maxRA, top))) elif top>90: raise TwoSBoxes( cls( SPoint.fromDegreesP(minRA, bottom), SPoint.fromDegreesP(maxRA, 90)), cls( SPoint.fromDegreesP(180+minRA, bottom), SPoint.fromDegreesP(180+maxRA, 90))) return cls(SPoint.fromDegreesP(minRA, bottom), SPoint.fromDegreesP(maxRA, top))
[docs] def asSTCS(self, systemString: str) -> str: return removeTrailingZeroes("PositionInterval %s %s %s"%(systemString, "%.10f %.10f"%(self.corner1.x/DEG, self.corner1.y/DEG), "%.10f %.10f"%(self.corner2.x/DEG, self.corner2.y/DEG)))
[docs] def asPoly(self) -> SPoly: x1, y1 = self.corner1.x, self.corner1.y x2, y2 = self.corner2.x, self.corner2.y minX, maxX = min(x1, x2), max(x1, x2) minY, maxY = min(y1, y2), max(y1, y2) return SPoly(( SPoint(minX, minY), SPoint(minX, maxY), SPoint(maxX, maxY), SPoint(maxX, minY)))
[docs] @classmethod def fromDALI(cls, daliSeq: Sequence[Component]) -> SBox: # see asDALI() return cls( SPoint.fromDALI(daliSeq[:2]), SPoint.fromDALI(daliSeq[2:]))
[docs] def asDALI(self) -> Tuple[float, float, float, float]: # we're cheating a bit here; there's no official DALI representation # yet. This is for our private xtype x-box return self.corner1.asDALI()+self.corner2.asDALI()
[docs]class TwoSBoxes(excs.ExecutiveAction): """is raised when an SBox is constructed from center and size such that it overlaps the pole. """ def __init__(self, box1: SBox, box2: SBox): self.box1, self.box2 = box1, box2
try: import pymoc # type: ignore except ImportError: # we're trying to work without pymoc, too pass
[docs]class SMoc(PgSAdapter): """a MOC with a pgsphere interface. The default constructor accepts a pymoc.MOC instance, which is accessible as the moc attribute. The database interface uses the ASCII serialisation, for which there's the fromASCII constructor. """ pgType = "smoc" checkedAttributes = ["moc"] def __init__(self, moc: pymoc.MOC): self.moc = moc if not hasattr(self.moc, "explicitMaxOrder"): self.moc.explicitMaxOrder = None _orderHeadRE = re.compile(r"(\d+)/") _rangeRE = re.compile(r"\s*(\d+)(?:-(\d+))?\s*$") @property def maxOrder(self) -> int: if self.moc.explicitMaxOrder is None: return self.moc.order else: return self.moc.explicitMaxOrder @classmethod def _parseCells(cls, cellLiteral: str, firstPos: int ) -> List[int]: """returns a sequence of cells from a MOC cell literal. firstPos is the string position of the beginning of cellLiteral for error messages. """ curPos = 0 cells = [] for item in re.split("[, \n\t]", cellLiteral): if not item.strip(): continue mat = cls._rangeRE.match(item) if not mat: raise ValueError("MOC literal syntax error at char %s"% (firstPos+curPos)) if mat.group(2) is None: cells.append(int(mat.group(1))) else: cells.extend(list(range(int(mat.group(1)), int(mat.group(2))+1))) curPos += len(item)+1 return cells
[docs] @classmethod def fromASCII(cls, literal: str) -> SMoc: """returns an SMoc from a quasi-standard ASCII serialisation. """ # we do the parsing ourselves -- the pymoc interface is too clumsy, # and the rigidity of the parser doesn't look good. seps = cast(List[re.Match], list(cls._orderHeadRE.finditer(literal))+[re.search("$", literal)]) if len(seps)==1: raise ValueError("No order separator visible in MOC literal '%s'"% texttricks.makeEllipsis(literal, 40)) if not re.match(r"\s*$", literal[:seps[0].start()]): raise ValueError("MOC literal '%s' does not start with order spec"% texttricks.makeEllipsis(literal, 40)) moc = pymoc.MOC() moc.explicitMaxOrder = None for openMat, closeMat in cast( # TODO: why do I need this cast? Sequence[Tuple[re.Match, re.Match]], codetricks.iterRanges(seps)): order = int(openMat.group(1)) cells = cls._parseCells( literal[openMat.end():closeMat.start()], openMat.end()) if cells: moc.add(order, cells) else: moc.explicitMaxOrder = order return cls(moc)
[docs] @classmethod def fromCells(cls, order: int, pixels: List[int], maxOrder: Optional[int]=None) -> SMoc: """returns a SMoc instance from a collection of pixels at order. Pass maxOrder to set an explicit max order for the resulting MOC. """ moc = pymoc.MOC(order=order, cells=pixels) moc.explicitMaxOrder = maxOrder moc.normalize() return cls(moc)
@staticmethod def _formatASCIIRange(minCell: int, maxCell: int) -> str: """returns a cell literal for a MOC. """ if minCell==maxCell: return str(minCell) else: return "%d-%d"%(minCell, maxCell)
[docs] def asASCII(self) -> str: """returns an ascii serialisation of this MOC. """ # this is essentially the pymoc code, but again saving the file # interface that we don't want here. parts = [] for order, cells in self.moc: ranges = [] rmin, rmax = -1, -1 for cell in sorted(cells): if rmin==-1: rmin = rmax = cell elif rmax==cell-1: rmax = cell else: ranges.append(self._formatASCIIRange(rmin, rmax)) rmin = rmax = cell ranges.append(self._formatASCIIRange(rmin, rmax)) parts.append("%d/%s"%(order, " ".join(ranges))) if self.moc.explicitMaxOrder is not None: parts.append(f"{self.moc.explicitMaxOrder}/") return " ".join(parts)
[docs] @classmethod def fromFITS(cls, literal: bytes) -> SMoc: """returns an SMoc from a string containing a FITS-serialised MOC. """ from pymoc.io.fits import read_moc_fits_hdu # type: ignore moc = pymoc.MOC() read_moc_fits_hdu(moc, pyfits.open(io.BytesIO(literal))[1]) return cls(moc)
@staticmethod def _adaptToPgSphere(smoc: SMoc) -> AsIs: return AsIs("smoc '%s'"%(smoc.asASCII())) @classmethod def _castFromPgSphere(cls, value: Optional[str], cursor: Any ) -> Optional[SMoc]: if value is not None: return cls.fromASCII(value) return None
[docs] def asPoly(self) -> SPoly: raise TypeError("MOCs cannot be represented as polygons")
[docs] def asSTCS(self, frame: str) -> str: # no STCS for MOCs, but this is really just for old-style VOTable # serialisation, so let's cheat return "MOC "+self.asASCII()
[docs] def asSMoc(self, order: int=6) -> SMoc: """returns a copy of self, normalised for order. """ moc = self.moc.copy() moc.normalize(order) if self.moc.explicitMaxOrder is not None: moc.explicitMaxOrder = min(self.moc.explicitMaxOrder, order) return self.__class__(moc)
[docs] def getPlot(self, **kwargs) -> bytes: """returns a png string with a plot visualising this moc. """ from pymoc.util.plot import plot_moc # type: ignore with tempfile.NamedTemporaryFile(suffix=".png") as f: plot_moc(self.moc, filename=f.name, projection="moll", **kwargs) return f.read()
[docs] def asFITS(self) -> bytes: """returns a standard FITS (table) representation of this MOC. """ from pymoc.io.fits import write_moc_fits with tempfile.NamedTemporaryFile(suffix=".fits") as f: write_moc_fits(self.moc, f) f.seek(0) return f.read()
[docs] def asDALI(self) -> str: """returns the string representation of this MOC. This isn't what DALI actually says at this point, but we suspect it will say that at some point. """ return self.asASCII()
[docs] @classmethod def fromDALI(cls, literal: str) -> SMoc: """returns an SMoc from a MOC string. (see asDALI) """ return cls.fromASCII(literal)
[docs] def asNUNIQs(self) -> List[int]: """returns a list of integers usable as nuniqs. """ res: List[int] = [] for order, cells in self.moc: shiftedOrder = 4 * (4 ** order) res.extend(pix+shiftedOrder for pix in cells) return res
try: import psycopg2 from psycopg2.extensions import (register_adapter, AsIs, register_type, new_type) if TYPE_CHECKING: from psycopg2.extensions import connection as Connection def _query(conn: Connection, query: str, pars: Optional[Dict[str, Any]]=None) -> List[Tuple]: c = conn.cursor() c.execute(query, pars) return list(c) _getPgSClass = codetricks.buildClassResolver( PgSAdapter, list(globals().values()), key=lambda obj: obj.pgType, default=PgSAdapter) # type: ignore
[docs] def preparePgSphere(conn: Connection) -> None: if hasattr(psycopg2, "_pgsphereLoaded"): # type: ignore return try: oidmap = _query(conn, "SELECT typname, oid" " FROM pg_type" " WHERE typname ~ " " '^s(point|trans|circle|line|ellipse|poly|path|box|moc)'") for typeName, oid in oidmap: cls = _getPgSClass(typeName) if cls is not PgSAdapter: # base class is null value register_adapter(cls, cls._adaptToPgSphere) register_type( new_type((oid,), "spoint", cls._castFromPgSphere)) psycopg2._pgsphereLoaded = True # type: ignore conn.commit() except: psycopg2._pgsphereLoaded = False # type: ignore
except ImportError: # pragma: no cover # psycopg2 not installed. Since preparsePgSphere can only be # called from code depending on psycopg2, there's no harm if # we don't define it. pass if __name__=="__main__": # pragma: no cover import doctest doctest.testmod()