Source code for gavo.utils.misctricks

"""
Various helpers that didn't fit into any other xTricks.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

import base64
import collections
import functools
import io
import os
import re
import struct
import time
import threading
import urllib.request as urlrequest
import zlib

from docutils import core as rstcore
from docutils import nodes
from docutils import utils as rstutils
from docutils.parsers.rst import roles
from docutils.parsers.rst import directives
from docutils.parsers.rst.states import Inliner

from gavo.utils import excs

from gavo.utils.dachstypes import (Any, BinaryIO, Callable, Dict,
	Filename, Generator, Generic, List, Optional, Sequence, StrOrBytes,
	StrToStrMap, Tuple, Type, TypeVar, Union, cast)


BIBCODE_PATTERN = re.compile("[012]\d\d\d\w[^ ]{14}$")

[docs]def couldBeABibcode(s: str) -> bool: """returns true if we think that the string s is a bibcode. This is based on matching against BIBCODE_PATTERN. """ return bool(BIBCODE_PATTERN.match(s))
_RSTRoleReturnType = Tuple[List[nodes.Node], List[str]] _RSTRoleFunction = Callable[[str, str, str, int, Inliner, Optional[dict], Optional[List[str]]], _RSTRoleReturnType]
[docs]class RSTExtensions: """a register for local RST extensions. This is for both directives and interpreted text roles. We need these as additional markup in examples; these always introduce local rst interpreted text roles, which always add some class to the node in question (modifications are possible). These classes are then changed to properties as the HTML fragments from RST translation are processed by the _Example nevow data factory. To add a new text role, say:: RSTExtensions.addRole(roleName, roleFunc=None) You can pass in a full role function as discussed in ...docs/howto/rst-roles.html#define-the-role-function It must, however, add a dachs-ex-<roleName> class to the node. The default function produces a nodes.emphasis item with the proper class. In a pinch, you can pass a propertyName argument to addRole if the desired property name is distinct from the role name in the RST. This is used by tapquery and taprole since we didn't want to change our examples when the standard changed. To add a directive, say:: RSTExtensions.addDirective(dirName, dirClass) In HTML, these classes become properties named like the role name (except you can again use propertyName in a pinch). """ classToProperty: Dict[str, str] = {}
[docs] @classmethod def addDirective(cls, name: str, implementingClass: type, propertyName: Optional[str]=None) -> None: directives.register_directive(name, implementingClass) cls.classToProperty["dachs-ex-"+name] = propertyName or name
[docs] @classmethod def makeTextRole(cls, roleName: str, roleFunc: Optional[_RSTRoleFunction]=None, propertyName=None ) -> None: """creates a new text role for roleName. See class docstring. """ if roleFunc is None: roleFunc = cls._makeDefaultRoleFunc(roleName) roles.register_local_role(roleName, roleFunc) cls.classToProperty["dachs-ex-"+roleName] = propertyName or roleName
@classmethod def _makeDefaultRoleFunc(cls, roleName: str) -> _RSTRoleFunction: """returns an RST interpreted text role parser function returning an emphasis node with a dachs-ex-roleName class. """ def roleFunc(name, rawText, text, lineno, inliner, options={}, content=[]): node = nodes.emphasis(rawText, text) node["classes"] = ["dachs-ex-"+roleName] return [node], [] return roleFunc
# Generally useful RST extensions (for roles useful in examples, # see examplesrender) def _bibcodeRoleFunc(name: str, rawText: str, text: str, lineno: int, inliner: Inliner, options: Optional[dict]={}, content: Optional[List[str]]=[] ) -> _RSTRoleReturnType: if not couldBeABibcode(text): raise ValueError("Probably not a bibcode: '%s'"%text) node = nodes.reference(rawText, text, refuri="http://adsabs.harvard.edu/abs/%s"%text) node["classes"] = ["bibcode-link"] return [node], [] RSTExtensions.makeTextRole("bibcode", _bibcodeRoleFunc) del _bibcodeRoleFunc # RST extensions for documentation writing _explicitTitleRE = re.compile(r'^(.+?)\s*(?<!\x00)<(.*?)>$', re.DOTALL) def _dachsdocRoleFunc(name: str, rawText: str, text: str, lineno: int, inliner: Inliner, options: Optional[dict]={}, content: Optional[List[str]]=[] ) -> _RSTRoleReturnType: # inspired by sphinx extlinks text = rstutils.unescape(text) mat = _explicitTitleRE.match(text) if mat: title, url = mat.groups() else: title, url = text.split("/")[-1], text url = "http://docs.g-vo.org/DaCHS/"+url return [nodes.reference(title, title, internal=False, refuri=url) ], [] RSTExtensions.makeTextRole("dachsdoc", _dachsdocRoleFunc) del _dachsdocRoleFunc def _dachsrefRoleFunc(name: str, rawText: str, text: str, lineno: int, inliner: Inliner, options: Optional[dict]={}, content: Optional[List[str]]=[] ) -> _RSTRoleReturnType: # this will guess a link into the ref documentation text = rstutils.unescape(text) fragId = re.sub("[^a-z0-9]+", "-", text.lower()) url = "http://docs.g-vo.org/DaCHS/ref.html#"+fragId return [nodes.reference(text, text, internal=False, refuri=url) ], [] RSTExtensions.makeTextRole("dachsref", _dachsrefRoleFunc) del _dachsrefRoleFunc def _samplerdRoleFunc(name: str, rawText: str, text: str, lineno: int, inliner: Inliner, options: Optional[dict]={}, content: Optional[List[str]]=[] ) -> _RSTRoleReturnType: # this will turn into a link to a file in the GAVO svn # (usually for RDs) text = rstutils.unescape(text) url = "http://svn.ari.uni-heidelberg.de/svn/gavo/hdinputs/"+text+".rd" return [nodes.reference(text, text, internal=False, refuri=url) ], [] RSTExtensions.makeTextRole("samplerd", _samplerdRoleFunc) del _samplerdRoleFunc class _UndefinedType(type): """the metaclass for Undefined. Used internally. """ def __str__(cls): raise excs.StructureError("%s cannot be stringified."%cls.__name__) def __repr__(cls): return "<Undefined>" def __bool__(cls): return False
[docs]class Undefined(metaclass=_UndefinedType): """a sentinel for all kinds of undefined values. Do not instantiate. >>> Undefined() Traceback (most recent call last): TypeError: Undefined cannot be instantiated. >>> bool(Undefined) False >>> repr(Undefined) '<Undefined>' >>> str(Undefined) Traceback (most recent call last): gavo.utils.excs.StructureError: Undefined cannot be stringified. """ def __init__(self): raise TypeError("Undefined cannot be instantiated.")
[docs]class RateLimiter: """A class that helps limit rates of events. You construct it with a timeout (in seconds) and then protect things you want to rate-limit with "if rl.inDeadtime(key): skip". The key is an identifier for what it is that you want to limit (e.g., the sort of an event, so that different events can share a rate limiter). If you have many events that usually need rate limiting, you'd have to revisit this implementation -- this is really for when rate limiting is the exception. """ def __init__(self, timeout: float): self.timeout = timeout self.lastEvents: Dict[str, float] = {}
[docs] def inDeadtime(self, key: str): now = time.time() # no reason to have this work in 1970 if self.lastEvents.get(key, 0)+self.timeout>now: return True self.lastEvents[key] = now return False
[docs]@functools.total_ordering class QuotedName: """A string-like thing basically representing SQL delimited identifiers. This has some features that make handling these relatively painless in ADQL code. The most horrible feature is that these hash and compare as their embedded names, except to other QuotedNamess. SQL-92, in 5.2, roughly says: delimited identifiers compare literally with each other, delimited identifiers compare with regular identifiers after the latter are all turned to upper case. But since postgres turns everything to lower case, we do so here, too. >>> n1, n2, n3 = QuotedName("foo"), QuotedName('foo"l'), QuotedName("foo") >>> n1==n2,n1==n3,hash(n1)==hash("foo") (False, True, True) >>> print(n1, n2) "foo" "foo""l" >>> "Foo"<n1, n1>"bar" (False, True) >>> QuotedName('7oh-no"+rob').makeIdentifier() 'id7oh2dno222brob' """ def __init__(self, name: str): self.name = name def __hash__(self) -> int: return hash(self.name) def __eq__(self, other: Any) -> bool: if isinstance(other, QuotedName): return self.name==other.name elif isinstance(other, str): return self.name==other.lower() else: return False def __lt__(self, other: Any) -> bool: if isinstance(other, QuotedName): return self.name<other.name elif isinstance(other, str): return self.name<other.lower() else: return False def __str__(self) -> str: return '"%s"'%(self.name.replace('"', '""')) def __repr__(self) -> str: return 'QuotedName(%s)'%repr(self.name)
[docs] def isRegularLower(self) -> bool: return not not re.match("[a-z][a-z0-9_]*$", self.name)
[docs] def lower(self): # service to ADQL name resolution return self
[docs] def flatten(self) -> str: # ADQL query serialization return str(self)
[docs] def capitalize(self) -> str: # service for table head and such return self.name.capitalize()
[docs] def makeIdentifier(self) -> str: """returns self as something usable as a SQL regular identifier. This will be rather unreadable if there's a substantial number of non-letters in there, and of course there's no absolute guarantee that doesn't clash with actual identifiers. This is *not* for SQL serialisation but mainly for generating sqlKey, where this kind of thing ends up in %(name)s patterns. """ id = re.sub("[^a-zA-Z0-9]", lambda mat: "%x"%ord(mat.group(0)), self.name) if not re.match("[a-zA-Z]", id): id = "id"+id return id
def __add__(self, other: str): # for disambiguateColumns return QuotedName(self.name+other)
_StreamData = TypeVar('_StreamData', bytes, str)
[docs]class StreamBuffer(Generic[_StreamData]): """a buffer that takes data in arbitrary chunks and returns them in chops of chunkSize bytes. There's a lock in place so you can access add and get from different threads. When everything is written, you must all doneWriting. """ chunkSize = 50000 def __init__(self, chunkSize: Optional[int]=None, binary: bool=True): self.buffer: collections.deque = collections.deque() if chunkSize is not None: self.chunkSize = chunkSize self.curSize = 0 self.lock = threading.Lock() self.finished = False # annotation problem: understand how to make mypy grok this self.joiner: _StreamData = b"" if binary else "" # type: ignore
[docs] def add(self, data: _StreamData) -> None: with self.lock: self.buffer.append(data) self.curSize += len(data)
[docs] def get(self, numBytes: Optional[int]=None) -> Optional[_StreamData]: if numBytes is None: numBytes = self.chunkSize if self.curSize<numBytes and not self.finished: return None if not self.buffer: return None with self.lock: items, sz = [], 0 # collect items till we've got a chunk while self.buffer: item = self.buffer.popleft() sz += len(item) self.curSize -= len(item) items.append(item) if sz>=numBytes: break # make a chunk and push back what we didn't need chunk = self.joiner.join(items) leftOver = chunk[numBytes:] if leftOver: self.buffer.appendleft(leftOver) self.curSize += len(leftOver) chunk = chunk[:numBytes] return chunk
# XXX TODO: refactor get and getToChar to use as much common code # as sensible
[docs] def getToChar(self, char: _StreamData) -> Optional[_StreamData]: """returns the the buffer up to the first occurrence of char. If char is not present in the buffer, the function returns None. """ with self.lock: items, sz = [], 0 # collect items till we've got our character while self.buffer: item = self.buffer.popleft() sz += len(item) self.curSize -= len(item) items.append(item) if char in item: break else: # didn't break out of the loop, i.e., no char found. # items now contains the entire buffer. self.buffer.clear() self.buffer.append(self.joiner.join(items)) self.curSize = sz return None # char is in the last element of items items[-1], leftOver = items[-1].split(char, 1) chunk = self.joiner.join(items) if leftOver: self.buffer.appendleft(leftOver) self.curSize += len(leftOver) return chunk+char raise AssertionError("This cannot happen") # pragma: no cover
[docs] def getRest(self) -> _StreamData: """returns the entire buffer as far as it is left over. """ result = self.joiner.join(self.buffer) self.buffer = collections.deque() return result
[docs] def doneWriting(self) -> None: self.finished = True
_T = TypeVar("_T")
[docs]def grouped(n: int, seq: Sequence[_T]) -> List[List[_T]]: """yields items of seq in groups n elements. If len(seq)%n!=0, the last elements are discarded. >>> list(grouped(2, range(5))) [(0, 1), (2, 3)] >>> list(grouped(3, range(9))) [(0, 1, 2), (3, 4, 5), (6, 7, 8)] """ # annotation problem: understand why mypy doesn't understand this return list(zip(*([iter(seq)]*n))) # type: ignore
# annotation problem: https://github.com/python/mypy/issues/3737
[docs]def getfirst(args: Dict[str, _T], key: str, default: _T=Undefined) -> _T: # type: ignore """returns the first value of key in the web argument-like object args. args is a dictionary mapping keys to lists of values. If key is present, the first element of the list is returned; else, or if the list is empty, default if given. If not, a Validation error for the requested column is raised. Finally, if args[key] is neither list nor tuple (in an ininstance sense), it is returned unchanged. >>> getfirst({'x': [1,2,3]}, 'x') 1 >>> getfirst({'x': []}, 'x') Traceback (most recent call last): gavo.utils.excs.ValidationError: Field x: Missing mandatory parameter x >>> getfirst({'x': []}, 'y') Traceback (most recent call last): gavo.utils.excs.ValidationError: Field y: Missing mandatory parameter y >>> print(getfirst({'x': []}, 'y', None)) None >>> getfirst({'x': 'abc'}, 'x') 'abc' """ try: val = args[key] if isinstance(val, (list, tuple)): return val[0] else: return val except (KeyError, IndexError): if default is Undefined: raise excs.ValidationError("Missing mandatory parameter %s"%key, colName=key) return default
[docs]def sendUIEvent(eventName: str, *args) -> None: """sends an eventName to the DC event dispatcher. If no event dispatcher is available, do nothing. The base.ui object that DaCHS uses for event dispatching is only available to sub-packages above base. Other code should not use or need it under normal circumstances, but if it does, it can use this. All other code should use ``base.ui.notify<eventName>(*args)`` directly. """ try: from gavo.base import ui getattr(ui, "notify"+eventName)(*args) except ImportError: pass
[docs]def logOldExc(exc: Exception) -> Exception: """logs the mutation of the currently handled exception to exc. This just does a notifyExceptionMutation using sendUIEvent; it should only be used by code at or below base. """ sendUIEvent("ExceptionMutation", exc) return exc
[docs]def getFortranRec(f: BinaryIO) -> Optional[bytes]: """reads a "fortran record" from f and returns the payload. A "fortran record" comes from an unformatted file and has a 4-byte payload length before and after the payload. Native endianness is assumed here. If the two length specs do not match, a ValueError is raised. Of course, f must be open in binary mode. """ try: startPos: Union[int, str] = f.tell() except IOError: startPos = "(stdin)" rawLength = f.read(4) if rawLength==b'': # EOF return None recLen = struct.unpack("i", rawLength)[0] data = f.read(recLen) rawPost = f.read(4) if not rawPost: raise ValueError("Record starting at %s has no postamble"%startPos) postambleLen = struct.unpack("i", rawPost)[0] if recLen!=postambleLen: raise ValueError("Record length at record (%d) and did not match" " postamble declared length (%d) at %s"%( recLen, postambleLen, startPos)) return data
[docs]def iterFortranRecs(f: BinaryIO, skip: int=0) -> Generator[bytes, None, None]: """iterates over the fortran records in f. For details, see getFortranRec. """ while True: rec = getFortranRec(f) if rec is None: break if skip>0: skip -= 1 continue yield rec
[docs]def getWithCache(url: str, cacheDir: Filename, extraHeaders: dict={}) -> bytes: """returns the content of url, from a cache if possible. Of course, you only want to use this if there's some external guarantee that the resource behind url doesn't change. No expiry mechanism is present here. """ if not os.path.isdir(cacheDir): os.makedirs(cacheDir) cacheName = os.path.join(cacheDir, re.sub("[^\w]+", "", url)+".cache") if os.path.exists(cacheName): with open(cacheName, "rb") as f: return f.read() else: with urlrequest.urlopen(url) as f: doc = f.read() with open(cacheName, "wb") as f: f.write(doc) urlrequest.urlcleanup() return doc
[docs]def rstxToHTMLWithWarning(source: str, **userOverrides) -> Tuple[str, str]: """returns HTML and a string with warnings for a piece of ReStructured text. source can be a unicode string or a byte string in utf-8. userOverrides will be added to the overrides argument of docutils' core.publish_parts. """ sourcePath, destinationPath = None, None if not isinstance(source, str): source = source.decode("utf-8") warnAccum = io.StringIO() overrides = {'input_encoding': 'unicode', 'raw_enabled': True, 'doctitle_xform': None, 'warning_stream': warnAccum, 'initial_header_level': 4} overrides.update(userOverrides) parts = rstcore.publish_parts( source=source+"\n", source_path=sourcePath, destination_path=destinationPath, writer_name='html', settings_overrides=overrides) return parts["fragment"], warnAccum.getvalue()
[docs]def rstxToHTML(source: str, **userOverrides) -> str: """returns HTML for a piece of ReStructured text. source can be a unicode string or a byte string in utf-8. userOverrides will be added to the overrides argument of docutils' core.publish_parts. """ return rstxToHTMLWithWarning(source, **userOverrides)[0]
[docs]class CaseSemisensitiveDict(dict): """A dictionary allowing case-insensitive access to its content. This is used for DAL renderers which, unfortunately, are supposed to be case insensitive. Since case insensitivity is at least undesirable for service-specific keys, we go a semi-insenstitve approach here: First, we try literal matches, if that does not work, we try matching against an all-uppercase version. Name clashes resulting from different names being mapped to the same normalized version are handled in some random way. Don't do this. And don't rely on case normalization if at all possible. Only strings are allowed as keys here. This class is not concerned with the values. >>> d = CaseSemisensitiveDict({"a": 1, "A": 2, "b": 3}) >>> d["a"], d["A"], d["b"], d["B"] (1, 2, 3, 3) >>> d["B"] = 9; d["b"], d["B"] (3, 9) >>> del d["b"]; d["b"], d["B"] (9, 9) >>> "B" in d, "b" in d, "u" in d (True, True, False) >>> d.pop("a"), list(d.keys()) (1, ['A', 'B']) """ def __init__(self, *args, **kwargs): dict.__init__(self, *args, **kwargs) self._normCasedCache = None def __getitem__(self, key: str) -> Any: try: return dict.__getitem__(self, key) except KeyError: pass # try again with normalized case. return self._normCased[key.upper()] def __setitem__(self, key: str, value: Any) -> None: self._normCasedCache = None dict.__setitem__(self, key, value) def __contains__(self, key: object) -> bool: key = cast(str, key) return dict.__contains__(self, key) or key.upper() in self._normCased def __delitem__(self, key: str) -> None: self.pop(key, None)
[docs] def get(self, key: str, default: Any=None) -> Any: try: return self[key] except KeyError: return default
[docs] def pop(self, key: str, default: Any=KeyError) -> Any: try: return dict.pop(self, key) except KeyError: pass # try again with normalized case. try: return self._normCased.pop(key.upper()) except KeyError: if default is not KeyError: return default raise
[docs] def copy(self): return CaseSemisensitiveDict(dict.copy(self))
@property def _normCased(self) -> Dict[str, Any]: if self._normCasedCache is None: self._normCasedCache = dict((k.upper(), v) for k, v in self.items()) return self._normCasedCache
[docs] @classmethod def fromDict(cls: Type['CaseSemisensitiveDict'], aDict: Union[dict, 'CaseSemisensitiveDict']) -> 'CaseSemisensitiveDict': if isinstance(aDict, CaseSemisensitiveDict): return aDict else: return cls(aDict)
[docs]def getCleanBytes(b: StrOrBytes) -> bytes: """returns the bytes b in an ASCII representation. This is zlib-compressed base64 stuff. b can be a string, too, in which case it's utf-8 encoded before marshalling. """ if isinstance(b, str): b = b.encode("utf-8") return base64.b64encode( zlib.compress(b)).replace(b"\n", b"")
[docs]def getDirtyBytes(b: bytes) -> bytes: """returns b decoded and uncompressed. This is the inverse operation of getCleanBytes. b must be bytes, and bytes is what you get back. """ return zlib.decompress( base64.b64decode(b))
######################### pyparsing-based key-value lines. from gavo.utils.parsetricks import ( Word,alphas, QuotedString, Regex, OneOrMore, pyparsingWhitechars, pyparseString) def _makeKVLGrammar(): with pyparsingWhitechars(" \t"): keyword = Word(alphas+"_")("key") keyword.setName("Keyword") value = (QuotedString(quoteChar="'", escChar='\\') | Regex("[^'= \t]*"))("value") value.setName("Simple value or quoted string") pair = keyword - "=" - value pair.setParseAction(lambda s,p,t: (t["key"], t["value"])) line = OneOrMore(pair) line.setParseAction(lambda s,p,t: dict(list(t))) return line _KVL_GRAMMAR = _makeKVLGrammar()
[docs]def parseKVLine(aString: str): """returns a dictionary for a "key-value line". key-value lines represent string-valued dictionaries following postgres libpq/dsn (see PQconnectdb docs; it's keyword=value, whitespace-separated, with whitespace allowed in values through single quoting, and backslash-escaping """ return pyparseString(_KVL_GRAMMAR, aString, parseAll=True)[0]
_IDENTIFIER_PATTERN = re.compile("[A-Za-z_]+$")
[docs]def makeKVLine(aDict: StrToStrMap) -> str: """serializes a dictionary to a key-value line. See parseKVLine for details. """ parts = [] for key, value in aDict.items(): if not _IDENTIFIER_PATTERN.match(key): raise ValueError("'%s' not allowed as a key in key-value lines"%key) value = str(value) if not _IDENTIFIER_PATTERN.match(value): value = "'%s'"%value.replace("\\", "\\\\" ).replace("'", "\\'") parts.append("%s=%s"%(key, value)) return " ".join(sorted(parts))
if __name__=="__main__": # pragma: no cover import doctest doctest.testmod()