Source code for gavo.utils.algotricks

"""
Some fundamental algorithms not found in the standard library.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

import itertools

from gavo.utils.dachstypes import (Any, Callable, Dict, FrozenSet,
	Generic, Iterator, Hashable, List, Sequence, Set, Tuple, TypeVar, Union)


# A node within an IndexedGraph
IgNode = TypeVar("IgNode")
# A Generic for "local" use
T = TypeVar("T")


[docs]class IndexedGraph(Generic[IgNode]): """A graph that is indexed by both incoming and outgoing edges. The constructor argument edges is an iterable of (from, to)-tuples, where both from and to must be hashable. The class keeps a set rootNodes of "root nodes" (i.e. those with no incoming edges). To find leaf nodes, you'd have to compute allNodes-set(outgoingIndex). You may access allNodes, rootNodes, incomingIndex, and outgoingIndex reading, but any external change to them will wreak havoc. """ def __init__(self, edges:List[Tuple[IgNode,IgNode]]): self.allNodes: Set[IgNode] = set() self.rootNodes = self.allNodes.copy() self.incomingIndex: Dict[IgNode, Set[IgNode]] = {} self.outgoingIndex: Dict[IgNode, Set[IgNode]] = {} for comeFrom, goTo in edges: self.addEdge(comeFrom, goTo) def __bool__(self): return not not self.incomingIndex
[docs] def addEdge(self, comeFrom:IgNode, goTo:IgNode) -> None: self.incomingIndex.setdefault(goTo, set()).add(comeFrom) self.outgoingIndex.setdefault(comeFrom, set()).add(goTo) self.allNodes.add(goTo) self.allNodes.add(comeFrom) if comeFrom not in self.incomingIndex: self.rootNodes.add(comeFrom) self.rootNodes.discard(goTo)
[docs] def removeEdge(self, comeFrom:IgNode, goTo:IgNode) -> None: if (comeFrom not in self.outgoingIndex or goTo not in self.outgoingIndex[comeFrom]): raise KeyError("No edge %s->%s"%(comeFrom, goTo)) self.outgoingIndex[comeFrom].remove(goTo) self.incomingIndex[goTo].remove(comeFrom) if not self.outgoingIndex[comeFrom]: del self.outgoingIndex[comeFrom] if comeFrom not in self.incomingIndex: self.allNodes.remove(comeFrom) self.rootNodes.discard(comeFrom) if not self.incomingIndex[goTo]: del self.incomingIndex[goTo] self.rootNodes.add(goTo) if goTo not in self.outgoingIndex: self.allNodes.remove(goTo) self.rootNodes.discard(goTo)
[docs] def getEdge(self) -> Tuple[IgNode, IgNode]: """returns a random edge. It raises an IndexError if the graph is empty. """ comeFrom = next(iter(self.incomingIndex.keys())) goTo = self.incomingIndex[comeFrom].pop() self.incomingIndex[comeFrom].add(goTo) return comeFrom, goTo
[docs] def getSomeRootNode(self) -> IgNode: """returns an arbitrary element of rootNodes. This will raise a KeyError if no root nodes are left. """ el = self.rootNodes.pop() self.rootNodes.add(el) return el
[docs]def topoSort(edges:List[Tuple[IgNode, IgNode]]) -> List[IgNode]: """returns a list with the nodes in the graph edges sorted topologically. edges is a sequence of (from, to)-tuples, where both from and to must be hashable. A ValueError is raised if the graph is not acyclic. """ res = [] emptySet: FrozenSet[IgNode] = frozenset() graph = IndexedGraph(edges) while graph.rootNodes: comeFrom: IgNode = graph.getSomeRootNode() res.append(comeFrom) outgoingFromCur = graph.outgoingIndex.get(comeFrom, emptySet).copy() for goTo in outgoingFromCur: graph.removeEdge(comeFrom, goTo) if goTo not in graph.allNodes: res.append(goTo) if graph: raise ValueError("Graph not acyclic, cycle: %s->%s"%graph.getEdge()) return res
[docs]def pathSort(paths:Sequence[str]) -> List[str]: """returns paths sorted in a way that parents are in front of their children. Missing parents are inserted. >>> pathSort(["a/b/c"]) ['a', 'a/b', 'a/b/c'] >>> pathSort(["/a/b/c", "/x/y", "/a/b/x", "/a/c"]) ['', '/a', '/a/b', '/a/b/c', '/a/b/x', '/a/c', '/x', '/x/y'] """ withParents = set() for path in paths: withParents.add(path) spos = path.find("/") while spos!=-1: withParents.add(path[:spos]) spos = path.find("/", spos+1) return list(sorted(withParents))
class _Deferred: """A helper class for DeferringDict encapsulating the function call to actualise a value. """ def __init__(self, callable: Callable, args: Sequence=(), kwargs: Dict={}): self.callable, self.args, self.kwargs = callable, args, kwargs def actualize(self) -> Any: return self.callable(*self.args, **self.kwargs)
[docs]class DeferringDict(dict): """A dictionary that stores tuples of a callable and its arguments and will, on the first access, do the calls. This is used below to defer the construction of instances in the class resolver to when they are actually used. This is important with interface classes in the registry code,, since they usually need the entire system up before they can sensibly be built. You can assign either a single function or a tuple of a callable and its positional and keyword arguments to each key. """ def __setitem__(self, key: str, value: Union[Callable, Tuple[Callable, ...]]) -> None: if isinstance(value, tuple): dict.__setitem__(self, key, _Deferred(*value)) # type: ignore else: dict.__setitem__(self, key, _Deferred(value)) def __getitem__(self, key: str) -> Any: val = dict.__getitem__(self, key) if isinstance(val, _Deferred): val = val.actualize() dict.__setitem__(self, key, val) return val
[docs] def iteritems(self) -> Iterator[Tuple[str, Any]]: for key in self: yield key, self[key]
[docs]def commonPrefixLength(t1: Sequence[T], t2: Sequence[T]) -> int: """returns the length of the common prefix of the sequences t1, t2. """ try: for prefixLength in itertools.count(): if t1[prefixLength]!=t2[prefixLength]: return prefixLength except IndexError: # seqs are identical up to the shorter one's length return prefixLength # the following line is shutting up mypy... return 0 #pragma: no cover
[docs]def chunk(items: List[T], getChunkTitle: Callable[[T], str], getKey: Callable=lambda a: a[0] ) -> List[Tuple[str, List[T]]]: """returns items in a chunked form. getChunkTitle(item)->chunk title is a function returning the chunk title, getKey(chunk)->sort key a function returning a sort key for the chunks (default is just the chunk title, i.e., chunk[0] The chunked form is a sequence of pairs of a chunk key and a sequence of items belonging to that chunk. The internal order within each chunk is not disturbed, so you should sort items as desired before passing things in. This is supposed to work like this: >>> chunk(["abakus", "Analysis", "knopf", "Kasper", "kohl"], ... lambda item: item[0].lower()) [('a', ['abakus', 'Analysis']), ('k', ['knopf', 'Kasper', 'kohl'])] """ chunks: Dict[str, List[T]] = {} for item in items: chunkKey = getChunkTitle(item) chunks.setdefault(chunkKey, []).append(item) return sorted(list(chunks.items()), key=getKey)
[docs]def identity(val:T) -> T: return val
[docs]def uniqueItems(seq:Sequence[Hashable]) -> List[Hashable]: """returns a list of the unique items in seq as they appear in seq. Execept for order and return type, this is essentially like set(seq). """ seenItems, result = set(), [] for item in seq: if item not in seenItems: result.append(item) seenItems.add(item) return result
if __name__=="__main__": #pragma: no cover import doctest doctest.testmod()