# -*- encoding: utf-8 -*-
"""
KERI
keri.app.grouping module
module for enveloping and forwarding KERI message
"""
from hio.base import doing
from hio.help import decking
from .. import kering
from .. import help
from ..app import delegating, agenting
from ..core import coring, routing, eventing, parsing, serdering
from ..db import dbing
from ..db.dbing import snKey
from ..peer import exchanging
logger = help.ogler.getLogger()
[docs]
class Counselor(doing.DoDoer):
def __init__(self, hby, swain=None, proxy=None, **kwa):
self.hby = hby
self.swain = swain if swain is not None else delegating.Sealer(hby=self.hby)
self.proxy = proxy
self.witDoer = agenting.Receiptor(hby=self.hby)
self.witq = agenting.WitnessInquisitor(hby=hby)
doers = [self.swain, self.witq, self.witDoer, doing.doify(self.escrowDo)]
super(Counselor, self).__init__(doers=doers, **kwa)
[docs]
def start(self, ghab, prefixer, seqner, saider):
""" Begin processing of escrowed group multisig identifier
Escrow identifier for multisigs, witness receipts and delegation anchor
Parameters:
ghab (Hab): group Habitat
prefixer (Prefixer): prefixer of group identifier
seqner (Seqner): seqner of event of group identifier
saider (Saider): saider of event of group identifier
"""
evt = ghab.makeOwnEvent(sn=seqner.sn, allowPartiallySigned=True)
serder = serdering.SerderKERI(raw=evt)
del evt[:serder.size]
print(f"Waiting for other signatures for {serder.pre}:{seqner.sn}...")
return self.hby.db.gpse.add(keys=(prefixer.qb64,), val=(seqner, saider))
[docs]
def complete(self, prefixer, seqner, saider=None):
""" Check for completed multsig protocol for the specific event
Parameters:
prefixer (Prefixer): qb64 identifier prefix of event to check
seqner (Seqner): sequence number of event to check
saider (Saider): optional digest of event to verify
Returns:
"""
csaider = self.hby.db.cgms.get(keys=(prefixer.qb64, seqner.qb64))
if not csaider:
return False
else:
if saider and (csaider.qb64 != saider.qb64):
raise kering.ValidationError(f"invalid multisig protocol escrowed event {csaider.qb64}-{saider.qb64}")
return True
[docs]
def escrowDo(self, tymth, tock=1.0):
""" Process escrows of group multisig identifiers waiting to be compeleted.
Steps involve:
1. Sending local event with sig to other participants
2. Waiting for signature threshold to be met.
3. If elected and delegated identifier, send complete event to delegator
4. If delegated, wait for delegator's anchored seal
5. If elected, send event to witnesses and collect receipts.
6. Otherwise, wait for fully receipted event
Parameters:
tymth (function): injected function wrapper closure returned by .tymen() of
Tymist instance. Calling tymth() returns associated Tymist .tyme.
tock (float): injected initial tock value. Default to 1.0 to slow down processing
"""
# enter context
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
while True:
self.processEscrows()
yield 0.5
def processEscrows(self):
self.processPartialSignedEscrow()
self.processDelegateEscrow()
self.processPartialWitnessEscrow()
[docs]
def processPartialSignedEscrow(self):
"""
Process escrow of partially signed multisig group KEL events. Message
processing will send this local controllers signature to all other participants
then this escrow waits for signatures from all other participants
"""
for (pre,), (seqner, saider) in self.hby.db.gpse.getItemIter(): # group partially signed escrow
snkey = dbing.snKey(pre, seqner.sn)
sdig = self.hby.db.getKeLast(key=snkey)
if sdig:
self.hby.db.gpse.rem(keys=(pre,))
ghab = self.hby.habs[pre]
kever = ghab.kever
keys = [verfer.qb64 for verfer in kever.verfers]
sigs = self.hby.db.getSigs(dbing.dgKey(pre, bytes(sdig)))
if not sigs: # otherwise its a list of sigs
continue
sigers = [coring.Siger(qb64b=bytes(sig)) for sig in sigs]
windex = min([siger.index for siger in sigers])
# True if Elected to perform delegation and witnessing
witered = ghab.mhab.kever.verfers[0].qb64 == keys[windex]
if kever.delegated and kever.ilk in (coring.Ilks.dip, coring.Ilks.drt):
# We are a delegated identifier, must wait for delegator approval for dip and drt
if witered: # We are elected to perform delegation and witnessing messaging
print(f"We are the witnesser, sending {pre} to delegator")
self.swain.delegation(pre=pre, sn=seqner.sn)
else:
anchor = dict(i=pre, s=seqner.snh, d=saider.qb64)
if self.proxy:
self.witq.query(hab=self.proxy, pre=kever.delegator, anchor=anchor)
else:
self.witq.query(src=ghab.mhab.pre, pre=kever.delegator, anchor=anchor)
print("Waiting for delegation approval...")
self.hby.db.gdee.add(keys=(pre,), val=(seqner, saider))
else: # Non-delegation, move on to witnessing
if witered: # We are elected witnesser, send off event to witnesses
print(f"We are the fully signed witnesser {seqner.sn}, sending to witnesses")
self.witDoer.msgs.append(dict(pre=pre, sn=seqner.sn))
# Move to escrow waiting for witness receipts
print(f"Waiting for fully signed witness receipts for {seqner.sn}")
self.hby.db.gpwe.add(keys=(pre,), val=(seqner, saider))
[docs]
def processDelegateEscrow(self):
"""
Process escrow of delegate group multisig identifiers that are
waiting for delegator approval of a recent establishment event.
"""
for (pre,), (seqner, saider) in self.hby.db.gdee.getItemIter(): # group delegatee escrow
anchor = dict(i=pre, s=seqner.snh, d=saider.qb64)
ghab = self.hby.habs[pre]
kever = ghab.kevers[pre]
keys = [verfer.qb64 for verfer in kever.verfers]
witer = ghab.mhab.kever.verfers[0].qb64 == keys[0] # We are elected to perform delegation and witnessing
if witer: # We are elected witnesser, We've already done out part in Boatswain, we are done.
if self.swain.complete(prefixer=kever.prefixer, seqner=coring.Seqner(sn=kever.sn)):
self.hby.db.gdee.rem(keys=(pre,))
print(f"Delegation approval for {pre} received.")
self.hby.db.cgms.put(keys=(pre, seqner.qb64), val=saider)
else: # Not witnesser, we need to look for the anchor and then wait for receipts
if serder := self.hby.db.findAnchoringSealEvent(kever.delegator, seal=anchor):
aseq = coring.Seqner(sn=serder.sn)
couple = aseq.qb64b + serder.saidb
dgkey = dbing.dgKey(pre, saider.qb64b)
self.hby.db.setAes(dgkey, couple) # authorizer event seal (delegator/issuer)
self.hby.db.gdee.rem(keys=(pre,))
print(f"Delegation approval for {pre} received.")
# Move to escrow waiting for witness receipts
print(f"Waiting for witness receipts for {pre}")
self.hby.db.gdee.rem(keys=(pre,))
self.hby.db.gpwe.add(keys=(pre,), val=(seqner, saider))
[docs]
def processPartialWitnessEscrow(self):
"""
Process escrow of group multisig events that do not have a full compliment of receipts
from witnesses yet. When receipting is complete, remove from escrow and cue up a message
that the event is complete.
"""
for (pre,), (seqner, saider) in self.hby.db.gpwe.getItemIter(): # group partial witness escrow
kever = self.hby.kevers[pre]
dgkey = dbing.dgKey(pre, saider.qb64)
# Load all the witness receipts we have so far
wigs = self.hby.db.getWigs(dgkey)
ghab = self.hby.habs[pre]
keys = [verfer.qb64 for verfer in kever.verfers]
witer = ghab.mhab.kever.verfers[0].qb64 == keys[0]
if len(wigs) == len(kever.wits): # We have all of them, this event is finished
if witer and len(kever.wits) > 0:
witnessed = False
for cue in self.witDoer.cues:
if cue["pre"] == ghab.pre and cue["sn"] == seqner.sn:
witnessed = True
if not witnessed:
continue
print(f"Witness receipts complete, {pre} confirmed.")
self.hby.db.gpwe.rem(keys=(pre,))
self.hby.db.cgms.put(keys=(pre, seqner.qb64), val=saider)
elif not witer:
self.witDoer.gets.append(dict(pre=pre, sn=seqner.sn))
[docs]
class MultisigNotificationHandler:
"""
Handler for multisig coordination EXN messages
"""
[docs]
def __init__(self, resource, mux):
""" Create an exn handler for multisig messages
Parameters:
resource:
mux:
"""
self.resource = resource
self.mux = mux
[docs]
def handle(self, serder, attachments=None):
""" Do route specific processsing of multisig exn messages
Parameters:
serder (Serder): Serder of the exn multisig message
attachments (list): list of tuples of pather, CESR SAD path attachments to the exn event
"""
self.mux.add(serder=serder)
[docs]
def loadHandlers(exc, mux):
""" Load handlers for the peer-to-peer distributed group multisig protocol
Parameters:
exc (Exchanger): Peer-to-peer message router
mux (Multiplexor): Multisig communication coordinator
"""
exc.addHandler(MultisigNotificationHandler(resource="/multisig/icp", mux=mux))
exc.addHandler(MultisigNotificationHandler(resource="/multisig/rot", mux=mux))
exc.addHandler(MultisigNotificationHandler(resource="/multisig/ixn", mux=mux))
exc.addHandler(MultisigNotificationHandler(resource="/multisig/vcp", mux=mux))
exc.addHandler(MultisigNotificationHandler(resource="/multisig/iss", mux=mux))
exc.addHandler(MultisigNotificationHandler(resource="/multisig/rev", mux=mux))
exc.addHandler(MultisigNotificationHandler(resource="/multisig/exn", mux=mux))
exc.addHandler(MultisigNotificationHandler(resource="/multisig/rpy", mux=mux))
[docs]
def multisigInceptExn(hab, smids, rmids, icp, delegator=None):
"""
Args:
hab (Hab): habitat of local multisig member AID
smids (list): list of qb64 AIDs of members with signing authority
rmids (list): list of qb64 AIDs of members with rotation authority
icp (bytes): serialized inception event with CESR streamed attachments
delegator (str): qb64 AID of Delegator is group multisig is a delegated AID
Returns:
tuple: (Serder, bytes): Serder of exn message and CESR attachments
"""
rmids = rmids if rmids is not None else smids
serder = serdering.SerderKERI(raw=icp) # coring.Serder(raw=icp)
data = dict(
gid=serder.pre,
smids=smids,
rmids=rmids,
)
embeds = dict(
icp=icp,
)
if delegator is not None:
data |= dict(delegator=delegator)
# Create `exn` peer to peer message to notify other participants UI
exn, end = exchanging.exchange(route="/multisig/icp", modifiers=dict(),
payload=data, embeds=embeds, sender=hab.pre)
ims = hab.endorse(serder=exn, last=False, pipelined=False)
del ims[:exn.size]
ims.extend(end)
return exn, ims
[docs]
def multisigRotateExn(ghab, smids, rmids, rot):
"""
Args:
ghab (GroupHab): habitat of group multisig AID
smids (list): list of qb64 AIDs of members with signing authority
rmids (list): list of qb64 AIDs of members with rotation authority
rot (bytes): serialized rotation event with CESR streamed attachments
Returns:
tuple: (Serder, bytes): Serder of exn message and CESR attachments
"""
embeds = dict(
rot=rot,
)
exn, end = exchanging.exchange(route="/multisig/rot", modifiers=dict(),
payload=dict(gid=ghab.pre,
smids=smids,
rmids=rmids), sender=ghab.mhab.pre,
embeds=embeds)
ims = ghab.mhab.endorse(serder=exn, last=False, pipelined=False)
atc = bytearray(ims[exn.size:])
atc.extend(end)
return exn, atc
[docs]
def multisigInteractExn(ghab, aids, ixn):
""" Create a peer to peer message to propose a multisig group interaction event
Parameters:
ghab (GroupHab): group Hab to endorse the message
aids (list): qb64 identifier prefixes to include in the interaction event
ixn (bytes): serialized interaction event with CESR streamed attachments
Returns:
tuple: (Serder, bytes): Serder of exn message and CESR attachments
"""
embeds = dict(
ixn=ixn,
)
exn, end = exchanging.exchange(route="/multisig/ixn", modifiers=dict(),
payload=dict(gid=ghab.pre,
smids=aids), sender=ghab.mhab.pre,
embeds=embeds)
ims = ghab.mhab.endorse(serder=exn, last=False, pipelined=False)
atc = bytearray(ims[exn.size:])
atc.extend(end)
return exn, atc
[docs]
def multisigRegistryInceptExn(ghab, usage, vcp, anc):
""" Create a peer to peer message to propose a credential registry inception from a multisig group identifier
Either rot or ixn are required but not both
Parameters:
ghab (GroupHab): identifier Hab for ensorsing the message to send
usage (str): human readable reason for creating the credential registry
vcp (bytes): serialized Credentials registry inception event
anc (bytes): CESR stream of serialized and signed event anchoring registry inception event
Returns:
tuple: (Serder, bytes): Serder of exn message and CESR attachments
"""
embeds = dict(
vcp=vcp,
anc=anc
)
exn, end = exchanging.exchange(route="/multisig/vcp", payload={'gid': ghab.pre, 'usage': usage},
sender=ghab.mhab.pre, embeds=embeds)
evt = ghab.mhab.endorse(serder=exn, last=False, pipelined=False)
atc = bytearray(evt[exn.size:])
atc.extend(end)
return exn, atc
[docs]
def multisigIssueExn(ghab, acdc, iss, anc):
""" Create a peer to peer message to propose a credential creation from a multisig group identifier
Either rot or ixn are required but not both
Parameters:
ghab (GroupHab): identifier Hab for ensorsing the message to send
acdc (bytes): serialized Credential
iss (bytes): CESR stream of serialized and TEL issuance event
anc (bytes): CESR stream of serialized and signed anchoring event anchoring creation
Returns:
tuple: (Serder, bytes): Serder of exn message and CESR attachments
"""
embeds = dict(
acdc=acdc,
iss=iss,
anc=anc
)
exn, end = exchanging.exchange(route="/multisig/iss", payload={'gid': ghab.pre},
sender=ghab.mhab.pre, embeds=embeds)
evt = ghab.mhab.endorse(serder=exn, last=False, pipelined=False)
atc = bytearray(evt[exn.size:])
atc.extend(end)
return exn, atc
[docs]
def multisigRevokeExn(ghab, said, rev, anc):
""" Create a peer to peer message to propose a credential revocation from a multisig group identifier
Either rot or ixn are required but not both
Parameters:
ghab (GroupHab): identifier Hab for ensorsing the message to send
said (str): qb64 SAID of credential being revoked
rev (bytes): CESR stream of serialized and TEL revocation event
anc (bytes): CESR stream of serialized and signed anchoring event anchoring revocation
Returns:
tuple: (Serder, bytes): Serder of exn message and CESR attachments
"""
embeds = dict(
rev=rev,
anc=anc
)
exn, end = exchanging.exchange(route="/multisig/rev", payload={'gid': ghab.pre, 'said': said},
sender=ghab.mhab.pre, embeds=embeds)
evt = ghab.mhab.endorse(serder=exn, last=False, pipelined=False)
atc = bytearray(evt[exn.size:])
atc.extend(end)
return exn, atc
[docs]
def multisigRpyExn(ghab, rpy):
""" Create a peer to peer message to propose a credential revocation from a multisig group identifier
Either rot or ixn are required but not both
Parameters:
ghab (GroupHab): identifier Hab for ensorsing the message to send
rpy (bytes): CESR stream of serialized and reply event
Returns:
tuple: (Serder, bytes): Serder of exn message and CESR attachments
"""
embeds = dict(
rpy=rpy
)
exn, end = exchanging.exchange(route="/multisig/rpy", payload={'gid': ghab.pre},
sender=ghab.mhab.pre, embeds=embeds)
evt = ghab.mhab.endorse(serder=exn, last=False, pipelined=False)
atc = bytearray(evt[exn.size:])
atc.extend(end)
return exn, atc
[docs]
def multisigExn(ghab, exn):
""" Create a peer to peer message to propose a credential issuance from a multisig group identifier
Either rot or ixn are required but not both
Parameters:
ghab (GroupHab): identifier Hab for ensorsing the message to send
exn (bytes): CESR stream of serialized echange message, with signatures
Returns:
tuple: (Serder, bytes): Serder of exn message and CESR attachments
"""
embeds = dict(
exn=exn
)
wexn, end = exchanging.exchange(route="/multisig/exn", payload={'gid': ghab.pre}, sender=ghab.mhab.pre,
embeds=embeds)
evt = ghab.mhab.endorse(serder=wexn, last=False, pipelined=False)
atc = bytearray(evt[wexn.size:])
atc.extend(end)
return wexn, atc
def getEscrowedEvent(db, pre, sn):
key = snKey(pre, sn)
dig = db.getPseLast(key)
if dig is None:
dig = db.getKeLast(key)
dig = bytes(dig)
key = dbing.dgKey(pre, dig) # digest key
msg = db.getEvt(key)
serder = serdering.SerderKERI(raw=bytes(msg))
sigs = []
for sig in db.getSigsIter(key):
sigs.append(coring.Siger(qb64b=bytes(sig)))
couple = db.getAes(key)
msg = bytearray()
msg.extend(serder.raw)
msg.extend(coring.Counter(code=coring.CtrDex.ControllerIdxSigs,
count=len(sigs)).qb64b) # attach cnt
for sig in sigs:
msg.extend(sig.qb64b) # attach sig
if couple is not None:
msg.extend(coring.Counter(code=coring.CtrDex.SealSourceCouples,
count=1).qb64b)
msg.extend(couple)
return msg
[docs]
class Multiplexor:
""" Multiplexor (mux) is responsible for coordinating peer-to-peer messages between group multisig participants
When new messages arrive the Mux will associate the SAID of the embedded messages with the exn message said
as well as the sender. This will allow the controller of the participant in the group multisig to have knowledge
of who has sent what messages and whether they match. In addition, if the controller of the local participant
has already approved the messages embedded in this exn, the messages will be passed thru a non-local parser.
Attributes:
hby (habbing.Habery): database environment for local Habs
rtr (routing.Router): routes reply 'rpy' messages
rvy (routing.Revery): factory that processes reply 'rpy' messages
exc (exchanging.Exchanger): processor and router for peer-to-peer msgs
kvy (eventing.Kevery): factory for local processing of local event msgs
psr (parsing.Parser): parses local messages for .kvy .rvy
notifier (notifying.Notifier): stores notices for numan consumption
Parameters:
hby (habbing.Habery): database environment for local Habs
notifier (notifying.Notifier): stores notices for numan consumption
"""
[docs]
def __init__(self, hby, notifier):
""" Create Multiplexor for local database and Habs
Parameters:
hby (habbing.Habery): database environment for local Habs
notifier (notifying.Notifier): stores notices for numan consumption
"""
self.hby = hby
self.rtr = routing.Router()
self.rvy = routing.Revery(db=self.hby.db, rtr=self.rtr)
self.exc = exchanging.Exchanger(hby=self.hby, handlers=[])
self.kvy = eventing.Kevery(db=self.hby.db, lax=False, local=False, rvy=self.rvy)
self.kvy.registerReplyRoutes(router=self.rtr)
self.psr = parsing.Parser(framed=True, kvy=self.kvy, rvy=self.rvy, exc=self.exc)
self.notifier = notifier
[docs]
def add(self, serder):
""" Process /multisig message by associating the exn with the SAID of the embedded event section
Adds the exn message contained in `serder` to the set of messages received for a given set of embedded
events. Ensures this is a /multisig message with the correct properties and then stores the SAID of the
exn message and the prefix of the sender associated with the SAID of the embedded event section. Also
sends the controller of the local participant a notice.
This method will extract and parse the embedded events if the local participant has already approved the
events so that any addition signatures can be processed.
Parameters:
serder (coring.Serder): peer-to-peer exn "/multisig" message to coordinate from other participants
Returns:
"""
ked = serder.ked
if 'e' not in ked: # No embedded events
return
embed = ked['e']
esaid = embed['d']
sender = ked['i']
route = ked['r']
payload = ked['a']
# Route specific logic to ensure this is a valid exn for a local participant.
match route.split("/"):
case ["", "multisig", "icp"]:
mids = payload["smids"]
if "rmids" in payload:
mids.extend(payload["rmids"])
member = any([True for mid in mids if mid in self.hby.kevers])
if not member:
raise ValueError(f"invalid request to join group, not member in mids={mids}")
case ["", "multisig", "rot"]:
gid = payload["gid"]
if gid not in self.hby.habs:
mids = payload["smids"]
mids.extend(payload["rmids"])
member = any([True for mid in mids if mid in self.hby.kevers])
if not member:
raise ValueError(f"invalid request to join group, not member in mids={mids}")
case ["", "multisig", *_]:
gid = payload["gid"]
if gid not in self.hby.habs:
raise ValueError(f"invalid request to participate in group, not member of gid={gid}")
case _:
raise ValueError(f"invalid route {route} for multiplexed exn={ked}")
if len(self.hby.db.meids.get(keys=(esaid,))) == 0: # No one has submitted this message yet
if sender not in self.hby.habs: # We are not sending this one, notify us
data = dict(
r=route,
d=serder.said
)
self.notifier.add(attrs=data)
self.hby.db.meids.add(keys=(esaid,), val=coring.Saider(qb64=serder.said))
self.hby.db.maids.add(keys=(esaid,), val=coring.Prefixer(qb64=serder.pre))
submitters = self.hby.db.maids.get(keys=(esaid,))
if sender not in self.hby.habs: # We are not sending this one, need to parse if already approved
# If we've already submitted an identical payload, parse this one because we've approved it
approved = any([True for sub in submitters if sub.qb64 in self.hby.kevers])
if approved:
# Clone exn from database, ensuring it is stored with valid signatures
exn, paths = exchanging.cloneMessage(self.hby, said=serder.said)
e = exn.ked['e']
ims = bytearray()
# Loop through all the embedded events, extract the attachments for those events...
for key, val in e.items():
if not isinstance(val, dict):
continue
sadder = coring.Sadder(ked=val)
ims.extend(sadder.raw)
if key in paths:
atc = paths[key]
ims.extend(atc)
# ... and parse
self.psr.parse(ims=ims)
else:
# Should we prod the user with another submission if we haven't already approved it?
route = ked['r']
data = dict(
r=route,
d=serder.said,
e=embed['d']
)
self.notifier.add(attrs=data)
def get(self, esaid):
saiders = self.hby.db.meids.get(keys=(esaid,))
exns = []
for saider in saiders:
exn, paths = exchanging.cloneMessage(hby=self.hby, said=saider.qb64)
exns.append(dict(
exn=exn.ked,
paths={k: path.decode("utf-8") for k, path in paths.items()},
))
return exns