Source code for keri.app.delegating

# -*- encoding: utf-8 -*-
"""
KERI
keri.app.delegating module

module for enveloping and forwarding KERI message
"""

from hio.base import doing
from hio.help import ogler

from .agenting import WitnessInquisitor, Receiptor, WitnessPublisher
from .forwarding import Poster
from .habbing import GroupHab

from ..kering import ValidationError
from ..core import Number, Diger, Seqner, SerderKERI, NumDex, exchange
from ..peer import specialExchange

logger = ogler.getLogger()


[docs] class Anchorer(doing.DoDoer): """Anchorer subclass of DoDoer Sends messages to Delegator of an identifier and wait for the anchoring event to be processed to ensure the inception or rotation event has been approved by the delegator. Removes all Doers and exits as Done once the event has been anchored. """
[docs] def __init__(self, hby, proxy=None, auths=None, **kwa): """ Initialize Anchorer. Parameters: hby (Habery): local controller database for this Anchorer proxy (Hab): optional proxy Habitat to use for sending messages auths (list[str]): TOTP authentication codes to send to witnesses to authorize event receipting """ self.hby = hby self.postman = Poster(hby=hby) self.witq = WitnessInquisitor(hby=hby) self.witDoer = Receiptor(hby=self.hby) self.publishers = dict() self.proxy = proxy self.auths = auths super(Anchorer, self).__init__(doers=[self.witq, self.witDoer, self.postman, doing.doify(self.escrowDo)], **kwa)
[docs] def delegation(self, pre, sn=None, proxy=None, auths=None): """ Add witness publishers by prefix and send delegation event to witnesses and place event on the delegation partial witness escrow. Parameters: pre (str): qb64 identifier prefix of delegated identifier sn (int): optional sequence number of event to anchor, defaults to latest event proxy (Hab): optional proxy Habitat to use for sending messages auths (list[str]): TOTP authentication codes to send to witnesses to authorize event receipting """ if pre not in self.hby.habs: raise ValidationError(f"{pre} is not a valid local AID for delegation") if proxy is not None: self.proxy = proxy self.publishers[pre] = WitnessPublisher(hby=self.hby) # load the hab of the delegated identifier to anchor hab = self.hby.habs[pre] delpre = hab.kever.delpre # get the delegator identifier if delpre not in hab.kevers: raise ValidationError(f"delegator {delpre} not found, unable to process delegation") sn = sn if sn is not None else hab.kever.sner.num self.auths = auths if auths is not None else self.auths # load the event and signatures evt = hab.msgOwnEvent(sn=sn) # Send exn message for notification purposes srdr = SerderKERI(raw=evt) self.witDoer.msgs.append(dict(pre=pre, sn=srdr.sn, auths=self.auths)) self.hby.db.dpwe.pin(keys=(srdr.pre, srdr.said), val=srdr)
[docs] def complete(self, prefixer, number, diger=None): """ Check for completed delegation for the specific delegation event Parameters: prefixer (Prefixer): qb64 identifier prefix of event to check number (Number.huge): number of event to check diger (Diger): optional digest of event to verify Returns: bool: True if delegation protocol is complete, False otherwise """ cdiger = self.hby.db.cdel.get(keys=prefixer.qb64b, on=number.sn) if not cdiger: return False else: if diger and (cdiger.qb64 != diger.qb64): raise ValidationError(f"invalid delegation protocol escrowed event {cdiger.qb64}-{diger.qb64}") return True
[docs] def escrowDo(self, tymth, tock=1.0, **kwa): """ Process escrows of delegation events waiting to be completed. Steps involve: 1. Sending local event with sig to other participants 2. Waiting for signature threshold to be met. 3. If elected and delegated identifier, send complete event to delegator 4. If delegated, wait for delegator's anchored seal 5. If elected, send event to witnesses and collect receipts. 6. Otherwise, wait for fully receipted event Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value. Default to 1.0 to slow down processing """ # enter context self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: self.processEscrows() yield 0.5
[docs] def processEscrows(self): """Process delegation escrows""" self.processPartialWitnessEscrow() self.processUnanchoredEscrow() self.processWitnessPublication()
[docs] def processUnanchoredEscrow(self): """ Process escrow of unanchored delegation events. Message processing will send this local controller's event to witnesses. """ for (pre, said), serder in self.hby.db.dune.getTopItemIter(): # delegated unanchored escrow kever = self.hby.kevers[pre] dkever = self.hby.kevers[kever.delpre] seal = dict(i=serder.pre, s=serder.snh, d=serder.said) if dserder := self.hby.db.fetchLastSealingEventByEventSeal(dkever.prefixer.qb64, seal=seal): sner = Number(num=dserder.sn, code=NumDex.Huge) diger = Diger(qb64b=dserder.saidb) self.hby.db.aess.pin(keys=(kever.prefixer.qb64b, kever.serder.saidb), val=(sner, diger)) # authorizer event seal (delegator/issuer) # Move to escrow waiting for witness receipts logger.info(f"Delegation approval received, {serder.pre} confirmed, publishing to my witnesses") self.publishDelegator(pre) self.hby.db.dpub.put(keys=(pre, said), val=serder) self.hby.db.dune.rem(keys=(pre, said))
[docs] def processPartialWitnessEscrow(self): """ Process escrow of delegated events that do not have a full complement of receipts from witnesses yet. When receipting is complete, remove from escrow and cue up a message that the event is complete. """ for (pre, said), serder in self.hby.db.dpwe.getTopItemIter(): # group partial witness escrow kever = self.hby.kevers[pre] seqner = Seqner(sn=serder.sn) # Load all the witness receipts we have so far wigers = self.hby.db.wigs.get(keys=(pre, serder.said)) if len(wigers) == len(kever.wits): # We have all of them, this event is finished if len(kever.wits) > 0: witnessed = False for cue in self.witDoer.cues: if cue["pre"] == serder.pre and cue["sn"] == seqner.sn: witnessed = True if not witnessed: continue logger.info(f"Witness receipts complete, waiting for delegation approval.") if pre not in self.hby.habs: continue hab = self.hby.habs[pre] delpre = hab.kever.delpre # get the delegator identifier dkever = hab.kevers[delpre] # and the delegator's kever smids = [] if isinstance(hab, GroupHab): phab = hab.mhab smids = hab.smids elif self.proxy is not None: phab = self.proxy else: raise ValidationError("no proxy to send messages for delegation") evt = hab.db.cloneEvtMsg(pre=serder.pre, fn=0, dig=serder.said) srdr = SerderKERI(raw=evt) exn, atc = delegateRequestExn(phab, delpre=delpre, evt=bytes(evt), aids=smids) logger.info( "Sending delegation request exn for %s from %s to delegator %s", srdr.ilk, phab.pre, delpre) logger.debug("Delegation request=\n%s\n", exn.pretty()) self.postman.send(hab=phab, dest=hab.kever.delpre, topic="delegate", serder=exn, attachment=atc) del evt[:srdr.size] logger.info("Sending delegation event %s from %s to delegator %s", srdr.ilk, phab.pre, delpre) logger.debug("Delegated inception=\n%s\n", srdr.pretty()) self.postman.send(hab=phab, dest=delpre, topic="delegate", serder=srdr, attachment=evt) seal = dict(i=srdr.pre, s=srdr.snh, d=srdr.said) self.witq.query(hab=phab, pre=dkever.prefixer.qb64, anchor=seal) self.hby.db.dpwe.rem(keys=(pre, said)) self.hby.db.dune.pin(keys=(srdr.pre, srdr.said), val=srdr)
[docs] def processWitnessPublication(self): """ Process escrow of partially signed delegation events. Message processing waits for publication to the witnesses to complete then completes the delegation. """ for (pre, said), serder in self.hby.db.dpub.getTopItemIter(): # group partial witness escrow if pre not in self.publishers: continue publisher = self.publishers[pre] if not publisher.idle: continue self.remove([publisher]) del self.publishers[pre] self.hby.db.dpub.rem(keys=(pre, said)) self.hby.db.cdel.put(keys=pre, on=serder.sn, val=Diger(qb64=serder.said))
[docs] def publishDelegator(self, pre): """Publish the delegation event to my witnesses.""" if pre not in self.publishers: return publisher = self.publishers[pre] hab = self.hby.habs[pre] self.extend([publisher]) for msg in hab.db.cloneDelegation(hab.kever): publisher.msgs.append(dict(pre=hab.pre, msg=bytes(msg)))
[docs] def loadHandlers(hby, exc, notifier): """ Load handlers for the peer-to-peer delegation protocols Parameters: hby (Habery): Database and keystore for environment exc (Exchanger): Peer-to-peer message router notifier (Notifier): Outbound notifications """ delreq = DelegateRequestHandler(hby=hby, notifier=notifier) exc.addHandler(delreq)
[docs] class DelegateRequestHandler: """ Handler for multisig group inception notification EXN messages """ resource = "/delegate/request"
[docs] def __init__(self, hby, notifier): """ Parameters: hby (Habery) database environment for this handler notifier (str) notifier for converting delegate request exn messages to controller notifications """ self.hby = hby self.notifier = notifier
[docs] def handle(self, serder, attachments=None): """ Do route specific processsing of delegation request messages Parameters: serder (Serder): Serder of the exn delegation request message attachments (list): list of tuples of pather, CESR SAD path attachments to the exn event """ src = serder.pre pay = serder.ked['a'] embeds = serder.ked['e'] delpre = pay["delpre"] if delpre not in self.hby.habs: logger.error(f"invalid delegate request message, no local delpre for evt=: {pay}") return data = dict( src=src, r='/delegate/request', delpre=delpre, ked=embeds["evt"] ) if "aids" in pay: data["aids"] = pay["aids"] self.notifier.add(attrs=data)
[docs] def delegateRequestExn(hab, delpre, evt, aids=None): """ Parameters: hab (Hab): database environment of sender delpre (str): qb64 AID of delegator evt (bytes): serialized and signed event requiring delegation approval aids (list): list of multisig AIDs participating Returns: """ data = dict( delpre=delpre, ) embeds = dict( evt=evt ) if aids is not None: data["aids"] = aids # Create `exn` peer to peer message to notify other participants UI exn, _ = specialExchange(sender=hab.pre, route=DelegateRequestHandler.resource, modifiers=dict(), attributes=data, embeds=embeds) ims = hab.endorse(serder=exn, last=False, framed=True) del ims[:exn.size] return exn, ims