# -*- encoding: utf-8 -*-
"""
keri.peer.exchanging module
"""
import logging
from datetime import timedelta
from hio.help import decking
from .. import help, kering
from ..app import habbing
from ..core import eventing, coring, serdering
from ..help import helping
from ..kering import ValidationError, MissingSignatureError
ExchangeMessageTimeWindow = timedelta(seconds=300)
logger = help.ogler.getLogger()
[docs]
class Exchanger:
"""
Peer to Peer KERI message Exchanger.
"""
[docs]
def __init__(self, hby, handlers, cues=None, delta=ExchangeMessageTimeWindow):
""" Initialize instance
Parameters:
hby (Haberyu): database environment
handlers(list): list of Handlers capable of responding to exn messages
cues (Deck): of Cues i.e. notices of requests needing response
delta (timedelta): message timeout window
"""
self.hby = hby
self.kevers = self.hby.db.kevers
self.delta = delta
self.routes = dict()
self.cues = cues if cues is not None else decking.Deck() # subclass of deque
for handler in handlers:
if handler.resource in self.routes:
raise ValidationError("unable to register behavior {}, it has already been registered"
"".format(handler.resource))
self.routes[handler.resource] = handler
def addHandler(self, handler):
if handler.resource in self.routes:
raise ValidationError("unable to register behavior {}, it has already been registered"
"".format(handler.resource))
self.routes[handler.resource] = handler
[docs]
def processEvent(self, serder, tsgs=None, cigars=None, **kwargs):
""" Process one serder event with attached indexed signatures representing a Peer to Peer exchange message.
Parameters:
serder (Serder): instance of event to process
tsgs (list): tuples (quadruples) of form
(prefixer, seqner, diger, [sigers]) where:
prefixer is pre of trans endorser
seqner is sequence number of trans endorser's est evt for keys for sigs
diger is digest of trans endorser's est evt for keys for sigs
[sigers] is list of indexed sigs from trans endorser's keys from est evt
cigars (list): of Cigar instances of attached non-trans sigs
"""
route = serder.ked["r"]
sender = serder.ked["i"]
pathed = kwargs["pathed"] if "pathed" in kwargs else []
behavior = self.routes[route] if route in self.routes else None
if tsgs is not None:
for prefixer, seqner, ssaider, sigers in tsgs: # iterate over each tsg
if sender != prefixer.qb64: # sig not by aid
raise MissingSignatureError(f"Exchange process: skipped signature not from aid="
f"{sender}, from {prefixer.qb64} on exn msg=\n{serder.pretty()}\n")
if prefixer.qb64 not in self.kevers or self.kevers[prefixer.qb64].sn < seqner.sn:
if self.escrowPSEvent(serder=serder, tsgs=tsgs, pathed=pathed):
self.cues.append(dict(kin="query", q=dict(r="logs", pre=prefixer.qb64, sn=seqner.snh)))
raise MissingSignatureError(f"Unable to find sender {prefixer.qb64} in kevers"
f" for evt = {serder.ked}.")
# Verify the signatures are valid and that the signature threshold as of the signing event is met
tholder, verfers = self.hby.db.resolveVerifiers(pre=prefixer.qb64, sn=seqner.sn, dig=ssaider.qb64)
_, indices = eventing.verifySigs(serder.raw, sigers, verfers)
if not tholder.satisfy(indices): # We still don't have all the sigers, need to escrow
if self.escrowPSEvent(serder=serder, tsgs=tsgs, pathed=pathed):
self.cues.append(dict(kin="query", q=dict(r="logs", pre=prefixer.qb64, sn=seqner.snh)))
raise MissingSignatureError(f"Not enough signatures in {indices}"
f" for evt = {serder.ked}.")
elif cigars is not None:
for cigar in cigars:
if sender != cigar.verfer.qb64: # cig not by aid
raise MissingSignatureError(" process: skipped cig not from aid="
"%s on exn msg=\n%s\n", sender, serder.pretty())
if not cigar.verfer.verify(cigar.raw, serder.raw): # cig not verify
raise MissingSignatureError("Failure satisfying exn on cigs for {}"
" for evt = {}.".format(cigar,
serder.ked))
else:
raise MissingSignatureError("Failure satisfying exn, no cigs or sigs"
" for evt = {}.".format(serder.ked))
e = coring.Pather(path=["e"])
attachments = []
for p in pathed:
pattach = bytearray(p)
pather = coring.Pather(qb64b=pattach, strip=True)
if pather.startswith(e):
np = pather.strip(e)
attachments.append((np, pattach))
# Perform behavior specific verification, think IPEX chaining requirements
try:
if not behavior.verify(serder=serder, attachments=attachments):
logger.info(f"exn event for route {route} failed behavior verfication. exn={serder.ked}")
return
except AttributeError:
logger.info(f"Behavior for {route} missing or does not have verify for exn={serder.ked}")
# Always persis events
self.logEvent(serder, pathed, tsgs, cigars)
self.cues.append(dict(kin="saved", said=serder.said))
# Execute any behavior specific handling, not sure if this should be different than verify
try:
behavior.handle(serder=serder, attachments=attachments)
except AttributeError:
logger.info(f"Behavior for {route} missing or does not have handle for exn={serder.ked}")
[docs]
def processEscrow(self):
""" Process all escrows for `exn` messages
"""
self.processEscrowPartialSigned()
[docs]
def escrowPSEvent(self, serder, tsgs, pathed):
""" Escrow event that does not have enough signatures.
Parameters:
serder (Serder): instance of event
tsgs (list): quadlet of prefixer seqner, saider, sigers
pathed (list): list of bytes of attached paths
"""
dig = serder.said
for prefixer, seqner, ssaider, sigers in tsgs: # iterate over each tsg
quadkeys = (serder.said, prefixer.qb64, f"{seqner.sn:032x}", ssaider.qb64)
for siger in sigers:
self.hby.db.esigs.add(keys=quadkeys, val=siger)
self.hby.db.epath.pin(keys=(dig,), vals=[bytes(p) for p in pathed])
return self.hby.db.epse.put(keys=(dig,), val=serder)
[docs]
def processEscrowPartialSigned(self):
""" Process escrow of partially signed messages """
for (dig,), serder in self.hby.db.epse.getItemIter():
tsgs = []
klases = (coring.Prefixer, coring.Seqner, coring.Saider)
args = ("qb64", "snh", "qb64")
sigers = []
old = None # empty keys
for keys, siger in self.hby.db.esigs.getItemIter(keys=(dig, "")):
quad = keys[1:]
if quad != old: # new tsg
if sigers: # append tsg made for old and sigers
prefixer, seqner, saider = helping.klasify(sers=old, klases=klases, args=args)
tsgs.append((prefixer, seqner, saider, sigers))
sigers = []
old = quad
sigers.append(siger)
if sigers and old:
prefixer, seqner, saider = helping.klasify(sers=old, klases=klases, args=args)
tsgs.append((prefixer, seqner, saider, sigers))
pathed = [bytearray(p.encode("utf-8")) for p in self.hby.db.epath.get(keys=(dig,))]
try:
self.processEvent(serder=serder, tsgs=tsgs, pathed=pathed)
except MissingSignatureError as ex:
if logger.isEnabledFor(logging.DEBUG):
logger.info("Exchange partially signed unescrow failed: %s\n", ex.args[0])
else:
logger.info("Exchange partially signed failed: %s\n", ex.args[0])
except Exception as ex:
self.hby.db.epse.rem(dig)
self.hby.db.esigs.rem(dig)
if logger.isEnabledFor(logging.DEBUG):
logger.info("Exchange partially signed unescrowed: %s\n", ex.args[0])
else:
logger.info("Exchange partially signed unescrowed: %s\n", ex.args[0])
else:
self.hby.db.epse.rem(dig)
self.hby.db.esigs.rem(dig)
logger.info("Exchanger unescrow succeeded in valid exchange: "
"creder=\n%s\n", serder.pretty())
def logEvent(self, serder, pathed=None, tsgs=None, cigars=None):
dig = serder.said
pdig = serder.ked['p']
pathed = pathed or []
tsgs = tsgs or []
cigars = cigars or []
for prefixer, seqner, ssaider, sigers in tsgs: # iterate over each tsg
quadkeys = (serder.said, prefixer.qb64, f"{seqner.sn:032x}", ssaider.qb64)
for siger in sigers:
self.hby.db.esigs.add(keys=quadkeys, val=siger)
for cigar in cigars:
self.hby.db.ecigs.add(keys=(dig,), val=(cigar.verfer, cigar))
saider = coring.Saider(qb64=serder.said)
self.hby.db.epath.pin(keys=(dig,), vals=[bytes(p) for p in pathed])
if pdig:
self.hby.db.erpy.pin(keys=(pdig,), val=saider)
self.hby.db.exns.put(keys=(dig,), val=serder)
[docs]
def lead(self, hab, said):
""" Determines is current member represented by hab is the lead of an exn message
Lead is the signer of the exn with the lowest signing index
Parameters:
hab (Hab): Habitat for sending of exchange message represented by SAID
said (str): qb64 SAID of exchange message
Returns:
bool: True means hab is the lead
"""
if not isinstance(hab, habbing.GroupHab):
return True
keys = [verfer.qb64 for verfer in hab.kever.verfers]
tsgs = eventing.fetchTsgs(self.hby.db.esigs, coring.Saider(qb64=said))
if not tsgs: # otherwise it contains a list of sigs
return False
(_, _, _, sigers) = tsgs[0]
windex = min([siger.index for siger in sigers])
# True if Elected to send an EXN to its recipient
return hab.mhab.kever.verfers[0].qb64 == keys[windex]
[docs]
def complete(self, said):
"""
Args:
said (str): qb64 said of exchange message to check status
Returns:
bool: True means exchange message is has been saved
"""
serder = self.hby.db.exns.get(keys=(said,))
if not serder:
return False
else:
if serder.said != said:
raise kering.ValidationError(f"invalid exchange escrowed event {serder.said}-{said}")
return True
[docs]
def exchange(route,
payload,
sender,
recipient=None,
date=None,
dig=None,
modifiers=None,
embeds=None,
version=coring.Version,
kind=coring.Serials.json):
""" Create an `exn` message with the specified route and payload
Parameters:
route (str): to destination route of the message
payload (list | dict): body of message to deliver to route
sender (str): qb64 AID of sender of the exn
recipient (str) optional qb64 AID recipient of exn
date (str): Iso8601 formatted date string to use for this request
dig (str) qb64 SAID of previous event if any
modifiers (dict): equivalent of query string of uri, modifiers for the request that are not
part of the payload
embeds (dict): named embeded KERI event CESR stream with attachments
version (Version): is Version instance
kind (Serials): is serialization kind
"""
vs = coring.versify(version=version, kind=kind, size=0)
ilk = eventing.Ilks.exn
dt = date if date is not None else helping.nowIso8601()
p = dig if dig is not None else ""
embeds = embeds if embeds is not None else {}
e = dict()
end = bytearray()
for label, msg in embeds.items():
serder = coring.Sadder(raw=msg)
e[label] = serder.ked
atc = bytes(msg[serder.size:])
if not atc:
continue
pathed = bytearray()
pather = coring.Pather(path=["e", label])
pathed.extend(pather.qb64b)
pathed.extend(atc)
end.extend(coring.Counter(code=coring.CtrDex.PathedMaterialQuadlets,
count=(len(pathed) // 4)).qb64b)
end.extend(pathed)
if e:
e["d"] = ""
_, e = coring.Saider.saidify(sad=e, label=coring.Saids.d)
attrs = dict(
)
if recipient is not None:
attrs['i'] = recipient
attrs |= payload
ked = dict(v=vs,
t=ilk,
d="",
i=sender,
p=p,
dt=dt,
r=route,
q=modifiers if modifiers is not None else {}, # q field required
a=attrs,
e=e)
_, ked = coring.Saider.saidify(sad=ked)
return serdering.SerderKERI(sad=ked), end # return serialized ked
[docs]
def cloneMessage(hby, said):
""" Load and verify signatures on message exn
Parameters:
hby (Habery): database environment from which to clone message
said (str): qb64 SAID of message exn to load
Returns:
tuple: (serder, list) of message exn and pathed signatures on embedded attachments
"""
exn = hby.db.exns.get(keys=(said,))
if exn is None:
return None, None
verify(hby=hby, serder=exn)
pathed = dict()
e = coring.Pather(path=["e"])
for p in hby.db.epath.get(keys=(exn.said,)):
pb = bytearray(p.encode("utf-8"))
pather = coring.Pather(qb64b=pb, strip=True)
if pather.startswith(e):
np = pather.strip(e)
nesting(np.path, pathed, pb)
return exn, pathed
def serializeMessage(hby, said, pipelined=False):
atc = bytearray()
exn = hby.db.exns.get(keys=(said,))
if exn is None:
return None, None
atc.extend(exn.raw)
tsgs, cigars = verify(hby=hby, serder=exn)
if len(tsgs) > 0:
for (prefixer, seqner, saider, sigers) in tsgs:
atc.extend(coring.Counter(coring.CtrDex.TransIdxSigGroups, count=1).qb64b)
atc.extend(prefixer.qb64b)
atc.extend(seqner.qb64b)
atc.extend(saider.qb64b)
atc.extend(coring.Counter(code=coring.CtrDex.ControllerIdxSigs, count=len(sigers)).qb64b)
for siger in sigers:
atc.extend(siger.qb64b)
if len(cigars) > 0:
atc.extend(coring.Counter(code=coring.CtrDex.NonTransReceiptCouples, count=len(cigars)).qb64b)
for cigar in cigars:
if cigar.verfer.code not in coring.NonTransDex:
raise ValueError("Attempt to use tranferable prefix={} for "
"receipt.".format(cigar.verfer.qb64))
atc.extend(cigar.verfer.qb64b)
atc.extend(cigar.qb64b)
# Smash the pathed components on the end
for p in hby.db.epath.get(keys=(exn.said,)):
atc.extend(coring.Counter(code=coring.CtrDex.PathedMaterialQuadlets,
count=(len(p) // 4)).qb64b)
atc.extend(p.encode("utf-8"))
msg = bytearray()
if pipelined:
if len(atc) % 4:
raise ValueError("Invalid attachments size={}, nonintegral"
" quadlets.".format(len(atc)))
msg.extend(coring.Counter(code=coring.CtrDex.AttachedMaterialQuadlets,
count=(len(atc) // 4)).qb64b)
msg.extend(atc)
return msg
def nesting(paths, acc, val):
if len(paths) == 0:
return val
else:
first_value = paths[0]
nacc = dict()
acc[first_value] = nesting(paths[1:], nacc, val)
return acc
[docs]
def verify(hby, serder):
""" Verify that the signatures in the database are valid for the provided exn
Parameters:
hby (Habery): database environment from which to verify message
serder (Serder): exn serder to load and verify signatures for
Returns:
bool: True means threshold satisfyig signatures were loaded and verified successfully
"""
tsgs = []
klases = (coring.Prefixer, coring.Seqner, coring.Saider)
args = ("qb64", "snh", "qb64")
sigers = []
old = None # empty keys
for keys, siger in hby.db.esigs.getItemIter(keys=(serder.said, "")):
quad = keys[1:]
if quad != old: # new tsg
if sigers: # append tsg made for old and sigers
prefixer, seqner, saider = helping.klasify(sers=old, klases=klases, args=args)
tsgs.append((prefixer, seqner, saider, sigers))
sigers = []
old = quad
sigers.append(siger)
if sigers and old:
prefixer, seqner, saider = helping.klasify(sers=old, klases=klases, args=args)
tsgs.append((prefixer, seqner, saider, sigers))
accepted = False
for prefixer, seqner, ssaider, sigers in tsgs:
if prefixer.qb64 not in hby.kevers or hby.kevers[prefixer.qb64].sn < seqner.sn:
raise MissingSignatureError(f"Unable to find sender {prefixer.qb64} in kevers"
f" for evt = {serder.ked}.")
# Verify the signatures are valid and that the signature threshold as of the signing event is met
tholder, verfers = hby.db.resolveVerifiers(pre=prefixer.qb64, sn=seqner.sn, dig=ssaider.qb64)
_, indices = eventing.verifySigs(serder.raw, sigers, verfers)
if not tholder.satisfy(indices): # We still don't have all the sigers, need to escrow
raise MissingSignatureError(f"Not enough signatures in {indices}"
f" for evt = {serder.ked}.")
accepted = True
cigars = hby.db.ecigs.get(keys=(serder.said,))
for cigar in cigars:
if not cigar.verfer.verify(cigar.raw, serder.raw): # cig not verify
raise MissingSignatureError("Failure satisfying exn on cigs for {}"
" for evt = {}.".format(cigar,
serder.ked))
accepted = True
if not accepted:
raise MissingSignatureError(f"No valid signatures stored for evt = {serder.ked}")
return tsgs, cigars