Source code for

General event handling.

Basically, everything roughly classified as user interaction should go
through this module.  gavo.base, on import, creates an instance of
EventDispatcher and installs it as base.ui.  The rest of the library
can then call methods of base.ui.

Clients can then register observers (probably derived from that subscribe to events and can display or
log them in some form appropriate to the client.

#c Copyright 2008-2023, the GAVO project <>
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

import contextlib
import sys

from gavo import utils

# the cli sets this to true if exceptions should not be mutated.

[docs]class DispatcherType(type): """is a metaclass for dispatching of messages. Basically, you define methods called notify<whatever> in your class. For each of them, a subscribe<whatever> method is added. Then, when notify<whatever> is called, your defined method is called, and its result is then passed to all callbacks passed in through subscribe<whatever>. """ def __init__(cls, name, bases, dict): type.__init__(cls, name, bases, dict) cls.eventTypes = [] cls._makeNotifiers(dict) def _makeNotifier(cls, name, callable): cls.eventTypes.append(name) def notify(self, *args, **kwargs): res = callable(self, *args, **kwargs) for callback in self.callbacks[name]: callback(res) return res def subscribe(self, callback): self.subscribe(name, callback) def unsubscribe(self, callback): self.unsubscribe(name, callback) setattr(cls, "notify"+name, notify) setattr(cls, "subscribe"+name, subscribe) setattr(cls, "unsubscribe"+name, unsubscribe) def _makeNotifiers(cls, dict): for name, val in dict.items(): if name.startswith("notify"): cls._makeNotifier(name[6:], val)
[docs]class EventDispatcher(object, metaclass=DispatcherType): """is the central event dispatcher. Events are posted by using notify* methods. Various handlers can then attach to them. """ def __init__(self): self.callbacks = dict((name, []) for name in self.eventTypes) self.sourceStack = [None] self.curSource = None # statistics of dachs imp self.totalShippedOut = 0 self.totalRead = 0 # the last row fed in by a grammar self.lastRow = None # the name of a long-running process (if any) self.longRunningProcess = None
[docs] @contextlib.contextmanager def suspended(self, evName): """a context manager suspending notification for a specific event. This is mainly for use by test code that wants to avoid spilling too much junk into the log. One weak point here is that any subscriptions entered while notification is suspended are lost. So: Don't suspend notifications for normal code. """ origCallbacks = self.callbacks[evName] self.callbacks[evName] = [] try: yield finally: self.callbacks[evName] = origCallbacks
[docs] def subscribe(self, evName, callback): self.callbacks[evName].append(callback)
[docs] def unsubscribe(self, evName, callback): """removes a callback from evName's callback list. It is not an error to unsubscribe a callback that's not subscribed. """ try: self.callbacks[evName].remove(callback) except ValueError: pass
[docs] def notifyExceptionMutation(self, newExc): """is called when an exception is being handled by raising newExc. The callbacks are passed a pair of sys.exc_info() and newExc. """ return sys.exc_info(), newExc
[docs] def logOldExc(self, newExc): """notifies of and ExceptionMutation and returns newExc. This is just a convenience when mutating exceptions. """ if PDB_ENABLED: # leave original exception for the pdb raise self.notifyExceptionMutation(newExc) return newExc
[docs] def notifyNewSource(self, sourceToken): """is called when a new source is being operated on. The callbacks are passed some, hopefully useful, token string. For file source, this is the file name, otherwise we try to make up something. As side effects, the curSource attribute is set to this value. """ sourceName = utils.makeSourceEllipsis(sourceToken) self.curSource = sourceName self.sourceStack.append(sourceName) return sourceName
[docs] def notifySourceError(self): """is called when a parse error occurred in a source. The callbacks are passed the name of the failing source. """ if self.sourceStack: # user-defined grammars may fail to push one lastSource = self.sourceStack.pop() else: lastSource = "Undefined" try: self.curSource = self.sourceStack[-1] except IndexError: # this would be an internal error... self.curSource = None return lastSource
[docs] def notifySourceFinished(self): """is called when a source file has been processed. The curSource attribute is updated, and its old value is propagated to the callbacks. """ try: lastSource = self.sourceStack.pop() self.curSource = self.sourceStack[-1] except IndexError: # someone didn't notified us of a finish without telling us first # they started. Don't fail because of this. lastSource = None return lastSource
[docs] def notifyShipout(self, numItems): """is called when certain table implementations store items. The number of items is passed on to the callbacks. As a side effect, the instance variable totalShippedOut is adjusted. InMemoryTables don't call this right now and probably never will. """ self.totalShippedOut += numItems return numItems
[docs] def notifyIncomingRow(self, row): """is called when certain grammars yield a row to the DC's belly. The callbacks receive a reference to the row. As a side effect, the instance variable totalRead is bumped up, and lastRow becomes the row passed in. To support this, RowIterators have to call this method in their _iterRows. Most will do, DictlistGrammars, e.g., don't. """ self.totalRead += 1 self.lastRow = row return row
[docs] def notifyIndexCreation(self, indexName): """is called when an index on a DB table is created. The callbacks receive the index name. """ return indexName
[docs] def notifyScriptRunning(self, script, source): """is called when a script is being started. The callback receives a pair of a scripting.Script and the object the script sits on. """ return script, source
[docs] def notifyError(self, errmsg): """is called when something wants to put out an error message. The handlers receive the error message as-is. In general, you will be in an exception context when you receive this error, but your handlers should not bomb out when you are not. """ return errmsg
[docs] def notifyFailure(self, flr, preMsg=""): """is called when an unexpected twisted failure is being processed. Call this *exclusively* from errbacks and never while handling an exception, or you'll get two tracebacks in the logs. You should not listen on this, since the handler just receives None. Rather, these events are converted to ErrorOccurreds including the failure's traceback. """ try: self.notifyError(preMsg +"\n" +str(flr.value)) except: # botched str() of exception. Oh my. self.notifyError("Swallowing unstringable exception") # If we're not handling an exception (which is generally true # when we're here because failures typically come up in errbacks), # provide a trace to info, because other handlers can't see it. if sys.exc_info()==(None, None, None): self.notifyInfo("Traceback (nf) a request:\n %s\n%s\n"%( flr.getErrorMessage(), flr.getTraceback()))
[docs] def notifyWarning(self, message): """is called when something tries to emit communicate non-fatal trouble. The handlers receive the message as-is """ return message
[docs] def notifyInfo(self, message): """is called when something tries to emit auxiliary information. The handlers receive the message as-is """ return message
[docs] def notifyDebug(self, message): """is called when something wants to communicate information only useful when trying to figure out a malfunction. The handlers receive the message as-is. """ return message
[docs] def notifyWebServerUp(self): """is called when the webserver is up and running. No arguments are transmitted. """ return ()
[docs] def notifyDBTableModified(self, fqName): """is called when an existing database table has been modified. The argument is the fully qualified table name. """ return fqName
[docs] def notifyProcessStarts(self, procDesc): """is called when a potentially long-running process presumably watched by the user starts. There can only be one of those at any time per dachs instance. You have to make sure that notifyProcessEnded is being called. """ if self.longRunningProcess is not None: # make this an error although it's a UI thing; if this happens, # it's pretty clear that someone hasn't cleaned up after # themselves, and they need to fix that. raise utils.ReportableError( f"A process was already active ({self.longRunningProcess}).") self.longRunningProcess = procDesc return procDesc
[docs] def notifyProcessEnded(self): """is called when a potentially long-running process has ended. The event handler receives the name of the long running process. """ try: return self.longRunningProcess finally: self.longRunningProcess = None
[docs] def notifyProgress(self, progress): """is called to update a progress indicator. Progress is a message (like "56%" or so) that is being handed through to the handler. """ if not self.longRunningProcess: raise utils.ReportableError("Progress report without process") return progress