Source code for gavo.rscdef.scripting

"""
Support code for attaching scripts to objects.

Scripts can be either in python or in SQL.  They always live on
make instances.  For details, see Scripting in the reference
documentation.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from gavo import base
from gavo import utils
from gavo.base import sqlsupport
from gavo.rscdef import rmkfuncs
from gavo.utils.parsetricks import (
	OneOrMore, ZeroOrMore, QuotedString, Forward,
	SkipTo, StringEnd, Regex, Suppress,
	Literal, pyparsingWhitechars)



[docs]class Error(base.Error): pass
def _getSQLScriptGrammar(): """returns a pyparsing ParserElement that splits SQL scripts into individual commands. The rules are: Statements are separated by semicolons, empty statements are allowed. """ with pyparsingWhitechars(" \t"): atom = Forward() atom.setName("Atom") sqlComment = Literal("--")+SkipTo("\n", include=True) cStyleComment = Literal("/*")+SkipTo("*/", include=True) comment = sqlComment | cStyleComment lineEnd = Literal("\n") simpleStr = QuotedString(quoteChar="'", escChar="\\", multiline=True, unquoteResults=False) quotedId = QuotedString(quoteChar='"', escChar="\\", unquoteResults=False) dollarQuoted = Regex(r"(?s)\$(\w*)\$.*?\$\1\$") dollarQuoted.setName("dollarQuoted") # well, quotedId is not exactly a string literal. I hate it, and so # it's lumped in here. strLiteral = simpleStr | dollarQuoted | quotedId strLiteral.setName("strLiteral") other = Regex("[^;'\"$]+") other.setName("other") literalDollar = Literal("$") + ~ Literal("$") statementEnd = ( Literal(';') + ZeroOrMore(lineEnd) | StringEnd() ) atom << ( Suppress(comment) | other | strLiteral | literalDollar ) statement = OneOrMore(atom) + Suppress( statementEnd ) statement.setName("statement") statement.setParseAction(lambda s, p, toks: " ".join(toks)) script = OneOrMore( statement ) + StringEnd() script.setName("script") script.setParseAction(lambda s, p, toks: [t for t in toks.asList() if str(t).strip()]) if False: atom.setDebug(True) comment.setDebug(True) other.setDebug(True) strLiteral.setDebug(True) statement.setDebug(True) statementEnd.setDebug(True) dollarQuoted.setDebug(True) literalDollar.setDebug(True) return script getSQLScriptGrammar = utils.CachedGetter(_getSQLScriptGrammar)
[docs]class ScriptRunner(object): """An object encapsulating the preparation and execution of scripts. They are constructed with instances of Script below and have a method ``run(dbTable, **kwargs)``. You probably should not override ``__init__`` but instead override ``_prepare(script)`` which is called by ``__init__``. """ def __init__(self, script): self.name, self.notify = script.name, script.notify self.pos = script.getSourcePosition() self._prepare(script) def _prepare(self, script): raise ValueError("Cannot instantiate plain ScriptRunners")
[docs]class SQLScriptRunner(ScriptRunner): """A runner for SQL scripts. These will always use the table's querier to execute the statements. Keyword arguments to run are ignored. """ def _prepare(self, script): self.statements = utils.pyparseString(getSQLScriptGrammar(), script.getSource())
[docs] def run(self, dbTable, **kwargs): try: for statement in self.statements: dbTable.connection.execute( dbTable.expand(statement.replace("%", "%%"))) except Exception as msg: raise base.ui.logOldExc( base.StructureError( "Execution of SQL script %s failed: %s"%(self.name, msg), self.pos))
[docs]class ACSQLScriptRunner(SQLScriptRunner): """A runner for "autocommitted" SQL scripts. These are like SQLScriptRunners, except that for every statement, a savepoint is created, and for SQL errors, the savepoint is restored (in other words ACSQL scripts turn SQL errors into warnings). """
[docs] def run(self, dbTable, **kwargs): conn = dbTable.connection for statement in self.statements: try: conn.execute("SAVEPOINT beforeStatement") try: conn.execute(statement.replace("%", "%%")) except sqlsupport.DBError as msg: conn.execute("ROLLBACK TO SAVEPOINT beforeStatement") base.ui.notifyError("Ignored error during script execution: %s"% msg) finally: conn.execute("RELEASE SAVEPOINT beforeStatement")
[docs]class PythonScriptRunner(ScriptRunner): """A runner for python scripts. The scripts can access the current table as table (and thus run SQL statements through table.connection.execute(query, pars)). Additional keyword arguments are available under their names. You are in the namespace of usual procApps (like procs, rowgens, and the like). """ def __init__(self, script): # I need to memorize the script as I may need to recompile # it if there's special arguments (yikes!) self.code = ("def scriptFun(table, **kwargs):\n"+ utils.fixIndentation(script.getSource(), " ")+"\n") ScriptRunner.__init__(self, script) def _compile(self, moreNames={}): return rmkfuncs.makeProc("scriptFun", self.code, "", self, **moreNames) def _prepare(self, script, moreNames={}): self.scriptFun = self._compile()
[docs] def run(self, dbTable, **kwargs): # I want the names from kwargs to be visible as such in scriptFun -- if # given. Since I do not want to manipulate func_globals, the only # way I can see to do this is to compile the script. I don't think # this is going to be a major performance issue. try: if kwargs: func = self._compile(kwargs) else: func = self.scriptFun func(dbTable, **kwargs) except Exception as msg: raise base.ui.logOldExc( base.StructureError( "Execution of python script %s failed: %s"%(self.name, msg), self.pos))
RUNNER_CLASSES = { "SQL": SQLScriptRunner, "python": PythonScriptRunner, "AC_SQL": ACSQLScriptRunner, }
[docs]class Script(base.Structure, base.RestrictionMixin): """A script, i.e., some executable item within a resource descriptor. The content of scripts is given by their type -- usually, they are either python scripts or SQL with special rules for breaking the script into individual statements (which are basically like python's). The special language AC_SQL is like SQL, but execution errors are ignored. This is not what you want for most data RDs (it's intended for housekeeping scripts). See `Scripting`_. """ name_ = "script" typeDesc_ = "Embedded executable code with a type definition" _lang = base.EnumeratedUnicodeAttribute("lang", default=base.Undefined, description="Language of the script.", validValues=RUNNER_CLASSES.keys(), copyable=True) _type = base.EnumeratedUnicodeAttribute("type", default=base.Undefined, description="Point of time at which script is to run (not all" " script types are allowed on all elements).", validValues=["preImport", "newSource", "preIndex", "preCreation", "postCreation", "afterMeta", "beforeDrop", "sourceDone"], copyable=True) _name = base.UnicodeAttribute("name", default=base.NotGiven, description="A human-consumable designation of the script.", copyable=True) _notify = base.BooleanAttribute("notify", default=True, description="Send out a notification when running this" " script.", copyable=True) _content = base.DataContent(copyable=True, description="The script body.") _original = base.OriginalAttribute()
[docs] def getSource(self): """returns the content with all macros expanded. """ return self.parent.getExpander().expand(self.content_)
[docs] def validate(self): if self.parent and self.type not in self.parent.acceptedScriptTypes: raise base.StructureError("Invalid script type %s for %s elements"%( self.type, self.parent.name_)) super().validate()
[docs] def completeElement(self, ctx): if self.name is base.NotGiven: if self.id: self.name = self.id else: self.name = "anonymous" super().completeElement(ctx)
[docs]class ScriptingMixin(base.StructCallbacks): """A mixin that gives objects a getRunner method and a script attribute. The getRunner() method returns a callable that takes the current table (we expect db tables, really), the phase and possibly further keyword arguments, as appropriate for the phase. Objects mixing this in must define an acceptedScriptTypes attribute containing a set of script types they support. Any other script type will be rejected. Objects mixing this in must also support define a method getExpander() returning an object mixin in a MacroPackage. """ acceptedScriptTypes = {} _scripts = base.StructListAttribute("scripts", childFactory=Script, description="Code snippets attached to this object. See Scripting_ .", copyable=False)
[docs] def getRunner(self): if not hasattr(self, "_runScriptsCache"): runnersByPhase = {} for rawScript in self.scripts: runner = RUNNER_CLASSES[rawScript.lang](rawScript) runnersByPhase.setdefault(rawScript.type, []).append(runner) def runScripts(table, phase, **kwargs): for runner in runnersByPhase.get(phase, []): if runner.notify: base.ui.notifyScriptRunning(runner, self) runner.run(table, **kwargs) self._runScriptsCache = runScripts return self._runScriptsCache