Source code for gavo.utils.ostricks

"""
OS abstractions and related.

This module contains, in partiular, the interface for having "easy subcommands"
using argparse.  The idea is to use the exposedFunction decorator on functions
that should be callable from the command line as subcommands; the functions
must all have the same signature. For example, if they all took the stuff
returned by argparse, you could say in the module containing them::

  args = makeCLIParser(globals()).parse_args()
  args.subAction(args)

To specify the command line arguments to the function, use Args.  See
admin.py for an example.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import argparse
import contextlib
import os
import sys
import tempfile
import urllib.request, urllib.parse

from gavo.utils import codetricks
from gavo.utils import excs
from gavo.utils import misctricks


from gavo.utils.dachstypes import (Any, BinaryIO, Callable,  Dict, Filename,
	Generator, IO, List, Optional, TextIO, Tuple, Union)


[docs]def safeclose(f: IO) -> None: """syncs and closes the python file f. You generally want to use this rather than a plain close() before overwriting a file with a new version. """ f.flush() os.fsync(f.fileno()) f.close()
[docs]@contextlib.contextmanager def safeReplaced(fName: Filename, *, binary: bool=True) -> Generator: """opens fName for "safe replacement". Safe replacement means that you can write to the object returned, and when everything works out all right, what you have written replaces the old content of fName, where the old mode is preserved if possible. When there are errors, however, the old content remains. """ mode = "wb" if binary else "w" targetDir = os.path.abspath(os.path.dirname(fName)) try: oldMode = os.stat(fName)[0] except os.error: oldMode = None handle, tempName = tempfile.mkstemp(".temp", "", dir=targetDir) targetFile = os.fdopen(handle, mode) try: yield targetFile except: safeclose(targetFile) try: os.unlink(tempName) except os.error: pass raise else: safeclose(targetFile) os.rename(tempName, fName) if oldMode is not None: try: os.chmod(fName, oldMode) except os.error: pass
class _UrlopenRemotePasswordMgr(urllib.request.HTTPPasswordMgr): """A password manager that grabs credentials from upwards in its call stack. This is for cooperation with urlopenRemote, which defines a name _temp_credentials. If this is non-None, it's supposed to be a pair of user password presented to *any* realm. This means that, at least with http basic auth, password stealing is almost trivial. """ def find_user_password(self, realm: str, authuri: str ) -> Tuple[Optional[str], Optional[str]]: creds = codetricks.stealVar("_temp_credentials") if creds is not None: return creds return (None, None) try: import ssl
[docs] class HTTPSHandler(urllib.request.HTTPSHandler): # We're overriding the https handler so we don't actually # check certificates. Yes, that may seem somewhat daring, # but then expired certs are normal in our business, whereas # man-in-the-middle attacks are not. def __init__(self, debuglevel: int=0, context: Optional[ssl.SSLContext]=None): if context is None: context = ssl.create_default_context( purpose=ssl.Purpose.SERVER_AUTH, cafile="/etc/ssl/certs/ca-certificates.crt") context.check_hostname = False context.verify_mode = ssl.CERT_NONE urllib.request.HTTPSHandler.__init__(self, debuglevel, context)
except (ImportError, IOError): # probably the ssl bundle isn't where I think it is; just use # the normal, certificate-checking https handler. from urllib.request import HTTPSHandler # type: ignore _restrictedURLOpener = urllib.request.OpenerDirector() _restrictedURLOpener.add_handler(urllib.request.HTTPRedirectHandler()) _restrictedURLOpener.add_handler(urllib.request.HTTPHandler()) try: _restrictedURLOpener.add_handler(HTTPSHandler()) except IOError: # some versions of ssl only check for the CA bundle at HTTPSHandler # construction time. If that happens, fall back to urllib default handler _restrictedURLOpener.add_handler(urllib.request.HTTPSHandler()) _restrictedURLOpener.add_handler( urllib.request.HTTPBasicAuthHandler(_UrlopenRemotePasswordMgr())) _restrictedURLOpener.add_handler(urllib.request.HTTPErrorProcessor()) _restrictedURLOpener.add_handler(urllib.request.FTPHandler()) _restrictedURLOpener.add_handler(urllib.request.UnknownHandler()) _restrictedURLOpener.addheaders = [("user-agent", "GAVO DaCHS HTTP client")]
[docs]def setUserAgent(userAgent: str) -> None: """sets the user agent string for requests through urlopenRemote. This is a global setting and thus, in particular, nowhere near thread-safe. """ assert len(_restrictedURLOpener.addheaders)==1 _restrictedURLOpener.addheaders = [("user-agent", userAgent)]
[docs]def urlopenRemote(url: str, *, data: Union[None,dict,str,bytes] = None, creds: Tuple[Optional[str], Optional[str]]=(None, None), timeout: int=100) -> BinaryIO: """works like urllib.urlopen, except only http, https, and ftp URLs are handled. The function also massages the error messages of urllib a bit. urllib errors always become IOErrors (which is more convenient within DaCHS). creds may be a pair of username and password. Those credentials will be presented in http basic authentication to any server that cares to ask. For both reasons, don't use any valuable credentials here. """ # The name in the next line is used in _UrlopenRemotePasswrodMgr _temp_credentials = creds #noflake: Picked up from down the call chain if isinstance(data, dict): data = urllib.parse.urlencode(data, encoding="utf-8") if isinstance(data, str): data = data.encode("ascii") try: res = _restrictedURLOpener.open(url, data, timeout=timeout) if res is None: raise IOError("Could not open URL %s -- does the resource exist?"% url) return res except (urllib.error.URLError, ValueError) as msg: msgStr = str(msg) try: msgStr = msg.args[0] if isinstance(msgStr, Exception): try: # maybe it's an os/socket type error msgStr = msgStr.args[1] except IndexError: # maybe not... pass if not isinstance(msgStr, str): msgStr = str(msg) except: # there's going to be an error message, albeit maybe a weird one pass raise IOError("Could not open URL %s: %s"%(url, msgStr))
[docs]def fgetmtime(fileobj: IO) -> float: """returns the mtime of the file below fileobj (like os.path.getmtime, but without having to have a file name). This raises an os.error if that file cannot be fstated. """ try: return os.fstat(fileobj.fileno()).st_mtime except AttributeError: raise misctricks.logOldExc(os.error("Not a file: %s"%repr(fileobj)))
[docs]def cat(srcF: IO, destF: IO, chunkSize: int=1<<20) -> None: """reads srcF into destF in chunks. """ while True: data = srcF.read(chunkSize) if not data: break destF.write(data)
[docs]def ensureDir(dirPath: str, *, mode: Optional[int]=None, setGroupTo: Optional[int]=None) -> None: """makes sure that dirPath exists and is a directory. If dirPath does not exist, it is created, and its permissions are set to mode with group ownership setGroupTo if those are given. setGroupTo must be a numerical gid if given. This function may raise all kinds of os.errors if something goes wrong. These probably should be handed through all the way to the user since when something fails here, there's usually little the program can safely do to recover. """ if os.path.exists(dirPath): return os.mkdir(dirPath) if mode is not None: os.chmod(dirPath, mode) if setGroupTo: os.chown(dirPath, -1, setGroupTo)
[docs]class Arg: """an argument/option to a subcommand. These are constructed with positional and keyword parameters to the argparse's add_argument. """ def __init__(self, *args, **kwargs): self.args, self.kwargs = args, kwargs
[docs] def add(self, parser): parser.add_argument(*self.args, **self.kwargs)
[docs]def exposedFunction(argSpecs: List[Arg]=[], help: Optional[str]=None ) -> Callable: """a decorator exposing a function to parseArgs. argSpecs is a sequence of Arg objects. This defines the command line interface to the function. The decorated function itself must accept a single argument, the args object returned by argparse's parse_args. """ def deco(func): func.subparseArgs = argSpecs func.subparseHelp = help return func return deco
class _PrefixMatchDict(dict): """A dictionary matching on unique prefixes. Is is just barely enough to enable longest-prefix matching for argparse. """ def __getitem__(self, key: str) -> Any: # type: ignore[override] matches = [s for s in list(self.keys()) if s.startswith(key)] if len(matches)==0: raise KeyError(key) elif len(matches)==1: return dict.__getitem__(self, matches[0]) else: raise excs.ReportableError("Ambiguous subcommand specification;" " choose between %s."%repr(matches)) def __contains__(self, key: str) -> bool: # type: ignore[override] for s in list(self.keys()): if s.startswith(key): return True return False
[docs]def makeCLIParser(functions: Dict[str, Any]) -> argparse.ArgumentParser: """returns a command line parser parsing subcommands from functions. functions is a dictionary (as returned from globals()). Subcommands will be generated from all objects that have a subparseArgs attribute; furnish them using the commandWithArgs decorator. This attribute must contain a sequence of Arg items (see above). """ parser = argparse.ArgumentParser() subparsers = parser.add_subparsers() for name, val in functions.items(): args = getattr(val, "subparseArgs", None) if args is not None: subForName = subparsers.add_parser( name, description=val.subparseHelp, help=val.subparseHelp) for arg in args: arg.add(subForName) subForName.set_defaults(subAction=val) # Now monkeypatch matching of unique prefixes into argparse guts. # If the guts change, don't fail hard, just turn off prefix matching try: for action in parser._actions: if isinstance(action, argparse._SubParsersAction): action.choices = action._name_parser_map = \ _PrefixMatchDict(action._name_parser_map) break except Exception as msg: # no prefix matching, then misctricks.sendUIEvent("Warning", "Couldn't teach prefix matching to argparse: %s"%repr(msg)) return parser
[docs]class StatusDisplay: """A context manager for updating a one-line display. This shouldn't be used from DaCHS proper (which should use base.ui.notify*), but it's sometimes handy in helper scripts. In short:: with StatusDisplay() as d: for i in range(300): d.update(str(i)) """ def __init__(self, dest_f: TextIO=sys.stdout): self.dest_f = dest_f self.clearer = "\r\n"
[docs] def update(self, new_content: str) -> None: self.dest_f.write(self.clearer+new_content) self.dest_f.flush() self.clearer = "\r"+(" "*len(new_content))+"\r"
def __enter__(self): self.dest_f.write(self.clearer) self.dest_f.flush() return self def __exit__(self, *args): self.dest_f.write("\r\n") self.dest_f.flush()