# -*- encoding: utf-8 -*-
"""
KERI
keri.app.forwarding module
module for enveloping and forwarding KERI message
"""
import random
import pysodium
from ordered_set import OrderedSet as oset
from hio.base import doing
from hio.help import decking, ogler
from ..kering import (Roles, Vrsn_1_0, Kinds,
ConfigurationError, ValidationError)
from .agenting import messengerFrom, streamMessengerFrom
from ..core import (Bexter, Prefixer, Verfer, Texter, Diger,
Sadder, Counter, SerderKERI,
MtrDex, Codens, NonTransDex, exchange)
from ..db import dgKey
from ..peer import specialExchange
from ..spac import PayloadTyper, PayloadTypes
logger = ogler.getLogger()
[docs]
class Poster(doing.DoDoer):
"""
DoDoer that wraps any KERI event (KEL, TEL, Peer to Peer) in a /fwd `exn` envelope and
delivers them to one of the target receiver's witnesses for store and forward
to the intended receiver
"""
def __init__(self, hby, mbx=None, evts=None, cues=None, **kwa):
self.hby = hby
self.mbx = mbx
self.evts = evts if evts is not None else decking.Deck()
self.cues = cues if cues is not None else decking.Deck()
doers = [doing.doify(self.deliverDo)]
super(Poster, self).__init__(doers=doers, **kwa)
[docs]
def deliverDo(self, tymth=None, tock=0.0, **kwa):
"""
Returns: doifiable Doist compatible generator method that processes
a queue of messages and envelopes them in a `fwd` message
and sends them to one of the witnesses of the receiver for
store and forward.
Usage:
add result of doify on this method to doers list
"""
# enter context
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
while True:
while self.evts:
evt = self.evts.popleft()
src = evt["src"]
recp = evt["dest"]
tpc = evt["topic"]
srdr = evt["serder"]
atc = evt["attachment"] if "attachment" in evt else None
# Get the hab of the sender
if "hab" in evt:
hab = evt["hab"]
else:
hab = self.hby.habs[src]
ends = hab.endsFor(recp)
try:
# If there is a controller, agent or mailbox in ends, send to all
if {Roles.controller, Roles.agent, Roles.mailbox} & set(ends):
for role in (Roles.controller, Roles.agent, Roles.mailbox):
if role in ends:
if role == Roles.mailbox:
yield from self.forward(hab,
ends[role],
recp=recp,
serder=srdr,
atc=atc,
topic=tpc)
else:
yield from self.sendDirect(hab, ends[role], serder=srdr, atc=atc)
# otherwise send to one witness
elif Roles.witness in ends:
yield from self.forwardToWitness(hab,
ends[Roles.witness],
recp=recp,
serder=srdr,
atc=atc,
topic=tpc)
else:
logger.info(f"No end roles for {recp} to send evt={srdr.said}")
continue
except ConfigurationError as e:
logger.error(f"Error sending to {recp} with ends={ends}. Err={e}")
continue
# Get the kever of the receiver and choose a witness
self.cues.append(dict(dest=recp, topic=tpc, said=srdr.said))
yield self.tock
yield self.tock
[docs]
def send(self, dest, topic, serder, src=None, hab=None, attachment=None):
"""
Utility function to queue a msg on the Poster's buffer for
enveloping and forwarding to a witness
Parameters:
src (str): qb64 identifier prefix of sender
hab (Hab): Sender identifier habitat
dest (str) is identifier prefix qb64 of the intended receiver
topic (str): topic of message
serder (Serder) KERI event message to envelope and forward:
attachment (bytes): attachment bytes
"""
src = src if src is not None else hab.pre
evt = dict(src=src, dest=dest, topic=topic, serder=serder)
if attachment is not None:
evt["attachment"] = attachment
if hab is not None:
evt["hab"] = hab
self.evts.append(evt)
[docs]
def sent(self, said):
""" Check if message with given SAID was sent
Parameters:
said (str): qb64 SAID of message to check for
"""
for cue in self.cues:
if cue["said"] == said:
return True
return False
[docs]
def sendEventToDelegator(self, sender, hab, fn=0):
""" Returns generator for sending event and waiting until send is complete """
# Send KEL event for processing
icp = self.hby.db.cloneEvtMsg(pre=hab.pre, fn=fn, dig=hab.kever.serder.saidb)
ser = SerderKERI(raw=icp)
del icp[:ser.size]
self.send(src=sender.pre, dest=hab.kever.delpre, topic="delegate", serder=ser, attachment=icp)
while True:
if self.cues:
cue = self.cues.popleft()
if cue["said"] == ser.said:
break
else:
self.cues.append(cue)
yield self.tock
def sendDirect(self, hab, ends, serder, atc):
for ctrl, locs in ends.items():
witer = messengerFrom(hab=hab, pre=ctrl, urls=locs)
msg = bytearray(serder.raw)
if atc is not None:
msg.extend(atc)
witer.msgs.append(bytearray(msg)) # make a copy
self.extend([witer])
while not witer.idle:
_ = (yield self.tock)
self.remove([witer])
def forward(self, hab, ends, recp, serder, atc, topic):
# If we are one of the mailboxes, just store locally in mailbox
owits = oset(ends.keys())
if self.mbx and owits.intersection(hab.prefixes):
msg = bytearray(serder.raw)
if atc is not None:
msg.extend(atc)
self.mbx.storeMsg(topic=f"{recp}/{topic}".encode("utf-8"), msg=msg)
return
# Its not us, randomly select a mailbox and forward it on
mbx, mailbox = random.choice(list(ends.items()))
msg = bytearray()
msg.extend(introduce(hab, mbx))
# create the forward message with payload embedded at `a` field
evt = bytearray(serder.raw)
evt.extend(atc)
fwd, atc = specialExchange(sender=hab.pre,
route='/fwd',
modifiers=dict(pre=recp, topic=topic),
attributes={},
embeds=dict(evt=evt), )
ims = hab.endorse(serder=fwd, last=False, framed=True)
# Transpose the signatures to point to the new location
witer = messengerFrom(hab=hab, pre=mbx, urls=mailbox)
msg.extend(ims)
msg.extend(atc)
witer.msgs.append(bytearray(msg)) # make a copy
self.extend([witer])
while not witer.idle:
_ = (yield self.tock)
def forwardToWitness(self, hab, ends, recp, serder, atc, topic):
# If we are one of the mailboxes, just store locally in mailbox
owits = oset(ends.keys())
if self.mbx and owits.intersection(hab.prefixes):
msg = bytearray(serder.raw)
if atc is not None:
msg.extend(atc)
self.mbx.storeMsg(topic=f"{recp}/{topic}".encode("utf-8"), msg=msg)
return
# Its not us, randomly select a mailbox and forward it on
mbx, mailbox = random.choice(list(ends.items()))
msg = bytearray()
msg.extend(introduce(hab, mbx))
# create the forward message with payload embedded at `a` field
evt = bytearray(serder.raw)
evt.extend(atc)
fwd, atc = specialExchange(sender=hab.pre,
route='/fwd',
modifiers=dict(pre=recp, topic=topic),
attributes={},
embeds=dict(evt=evt))
ims = hab.endorse(serder=fwd, last=False, framed=True)
# Transpose the signatures to point to the new location
witer = messengerFrom(hab=hab, pre=mbx, urls=mailbox)
msg.extend(ims)
msg.extend(atc)
witer.msgs.append(bytearray(msg)) # make a copy
self.extend([witer])
while not witer.idle:
_ = (yield self.tock)
[docs]
class StreamPoster:
"""
DoDoer that wraps any KERI event (KEL, TEL, Peer to Peer) in a /fwd `exn` envelope and
delivers them to one of the target receiver's witnesses for store and forward
to the intended receiver
"""
def __init__(self, hby, recp, src=None, hab=None, mbx=None, topic=None, headers=None, essr=False, **kwa):
if hab is not None:
self.hab = hab
else:
self.hab = hby.habs[src]
self.hby = hby
self.hab = hab
self.recp = recp
self.src = src
self.messagers = []
self.mbx = mbx
self.topic = topic
self.headers = headers
self.essr = essr
self.evts = decking.Deck()
[docs]
def deliver(self):
"""
Returns: doifiable Doist compatible generator method that processes
a queue of messages and envelopes them in a `fwd` message
and sends them to one of the witnesses of the receiver for
store and forward.
Usage:
add result of doify on this method to doers list
"""
doers = []
while self.evts:
doers += self._chunk()
return doers
def _chunk(self):
msg = bytearray()
if self.essr:
msg.extend(PayloadTyper(type=PayloadTypes.SCS).qb64b)
msg.extend(self.hab.kever.prefixer.qb64b)
# bext field can be randomized to reduce correlation based on packet size, empty for now
msg.extend(Bexter(bext="").qb64b)
while self.evts:
evt = self.evts.popleft()
serder = evt["serder"]
atc = evt["attachment"] if "attachment" in evt else b''
if self.essr and len(msg) + len(serder.raw) + len(atc) > 16384:
self.evts.appendleft(evt)
break
msg.extend(serder.raw)
msg.extend(atc)
if len(msg) == 0:
return []
ends = self.hab.endsFor(self.recp)
try:
# If there is a controller or agent in ends, send to all
if {Roles.controller, Roles.agent, Roles.mailbox} & set(ends):
for role in (Roles.controller, Roles.agent, Roles.mailbox):
if role in ends:
if role == Roles.mailbox:
return self.forward(self.hab, ends[role], msg=msg, topic=self.topic)
else:
return self.sendDirect(self.hab, ends[role], msg=msg)
# otherwise send to one witness
elif Roles.witness in ends:
return self.forward(self.hab, ends[Roles.witness], msg=msg, topic=self.topic)
else:
logger.info(f"No end roles for {self.recp} to send evt={self.recp}")
return []
except ConfigurationError as e:
logger.error(f"Error sending to {self.recp} with ends={ends}. Err={e}")
return []
[docs]
def send(self, serder, attachment=None):
"""
Utility function to queue a msg on the Poster's buffer for
enveloping and forwarding to a witness
Parameters:
serder (Serder) KERI event message to envelope and forward:
attachment (bytes): attachment bytes
"""
ends = self.hab.endsFor(self.recp)
try:
# If there is a controller, agent or mailbox in ends, send to all
if {Roles.controller, Roles.agent, Roles.mailbox} & set(ends):
for role in (Roles.controller, Roles.agent, Roles.mailbox):
if role in ends:
if role == Roles.mailbox:
serder, attachment = self.createForward(self.hab, serder=serder, ends=ends,
atc=attachment, topic=self.topic)
# otherwise send to one witness
elif Roles.witness in ends:
serder, attachment = self.createForward(self.hab, ends=ends, serder=serder,
atc=attachment, topic=self.topic)
else:
logger.info(f"No end roles for {self.recp} to send evt={self.recp}")
raise ValidationError(f"No end roles for {self.recp} to send evt={self.recp}")
except ConfigurationError as e:
logger.error(f"Error sending to {self.recp} with ends={ends}. Err={e}")
raise ValidationError(f"Error sending to {self.recp} with ends={ends}. Err={e}")
evt = dict(serder=serder)
if attachment is not None:
evt["attachment"] = attachment
self.evts.append(evt)
def sendDirect(self, hab, ends, msg):
for ctrl, locs in ends.items():
ims = self._essrWrapper(hab, msg, ctrl) if self.essr else msg
self.messagers.append(streamMessengerFrom(hab=hab, pre=ctrl, urls=locs, msg=ims,
headers=self.headers))
return self.messagers
def _essrWrapper(self, hab, msg, ctrl):
prefixer = Prefixer(qb64=ctrl)
if prefixer.code in NonTransDex: # e.g. witness mbx
verfer = Verfer(qb64=ctrl)
else:
rkever = self.hby.kevers[ctrl]
verfer = rkever.verfers[0]
pubkey = pysodium.crypto_sign_pk_to_box_pk(verfer.raw)
raw = pysodium.crypto_box_seal(bytes(msg), pubkey)
texter = Texter(raw=raw)
diger = Diger(ser=raw, code=MtrDex.Blake3_256)
essr, _ = specialExchange(sender=hab.pre,
route='/essr/req',
modifiers=dict(src=hab.pre, dest=ctrl),
diger=diger,)
ims = hab.endorse(serder=essr, framed=True)
ims.extend(Counter(Codens.ESSRPayloadGroup, count=1,
gvrsn=Vrsn_1_0).qb64b)
ims.extend(texter.qb64b)
return ims
def createForward(self, hab, ends, serder, atc, topic):
# If we are one of the mailboxes, just store locally in mailbox
owits = oset(ends.keys())
if self.mbx and owits.intersection(hab.prefixes):
msg = bytearray(serder.raw)
if atc is not None:
msg.extend(atc)
self.mbx.storeMsg(topic=f"{self.recp}/{topic}".encode("utf-8"), msg=msg)
return None, None
# Its not us, randomly select a mailbox and forward it on
evt = bytearray(serder.raw)
evt.extend(atc)
fwd, atc = specialExchange(sender=hab.pre,
route='/fwd',
modifiers=dict(pre=self.recp, topic=topic),
attributes={},
embeds=dict(evt=evt))
ims = hab.endorse(serder=fwd, last=False, framed=True)
return fwd, ims + atc
def forward(self, hab, ends, msg, topic):
# If we are one of the mailboxes, just store locally in mailbox
owits = oset(ends.keys())
if self.mbx and owits.intersection(hab.prefixes):
# Remove again if ESSR mode
if self.essr:
_tag = self.hby.psr.extract(msg, PayloadTyper)
_pre = self.hby.psr.extract(msg, Prefixer)
_pad = self.hby.psr.extract(msg, Bexter)
self.mbx.storeMsg(topic=f"{self.recp}/{topic}".encode("utf-8"), msg=msg)
return []
# Its not us, randomly select a mailbox and forward it on
mbx, mailbox = random.choice(list(ends.items()))
if self.essr:
msg = self._essrWrapper(hab, msg, mbx)
ims = bytearray()
ims.extend(introduce(hab, mbx))
ims.extend(msg)
self.messagers.append(streamMessengerFrom(hab=hab, pre=mbx, urls=mailbox, msg=bytes(ims)))
return self.messagers
[docs]
class ForwardHandler:
"""
Handler for forward `exn` messages used to envelope other KERI messages intended for another receiver.
This handler acts as a mailbox for other identifiers and stores the messages in a local database.
Example message::\n\n {
"v": "KERI10JSON00011c_", // KERI Version String
"t": "exn", // peer to peer message ilk
"dt": "2020-08-22T17:50:12.988921+00:00"
"r": "/fwd",
"q": {
"pre": "EEBp64Aw2rsjdJpAR0e2qCq3jX7q7gLld3LjAwZgaLXU",
"topic": "delegate"
}
"a": '{
"v":"KERI10JSON000154_",
"t":"dip",
"d":"Er4bHXd4piEtsQat1mquwsNZXItvuoj_auCUyICmwyXI",
"i":"Er4bHXd4piEtsQat1mquwsNZXItvuoj_auCUyICmwyXI",
"s":"0",
"kt":"1",
"k":["DuK1x8ydpucu3480Jpd1XBfjnCwb3dZ3x5b1CJmuUphA"],
"n":"EWWkjZkZDXF74O2bOQ4H5hu4nXDlKg2m4CBEBkUxibiU",
"bt":"0",
"b":[],
"c":[],
"a":[],
"di":"Et78eYkh8A3H9w6Q87EC5OcijiVEJT8KyNtEGdpPVWV8"
}
}-AABAA1o61PgMhwhi89FES_vwYeSbbWnVuELV_jv7Yv6f5zNiOLnj1ZZa4MW2c6Z_vZDt55QUnLaiaikE-d_ApsFEgCA
"""
resource = "/fwd"
[docs]
def __init__(self, hby, mbx):
"""
Parameters:
hby (Habery): database environment
mbx (Mailboxer): message storage for store and forward
"""
self.hby = hby
self.mbx = mbx
[docs]
def handle(self, serder, attachments=None):
""" Do route specific processsing of IPEX protocol exn messages
Parameters:
serder (Serder): Serder of the IPEX protocol exn message
attachments (list): list of tuples of root pathers and CESR SAD path attachments to the exn event
"""
embeds = serder.ked['e']
modifiers = serder.ked['q'] if 'q' in serder.ked else {}
receiver = modifiers["pre"]
topic = modifiers["topic"]
resource = f"{receiver}/{topic}"
pevt = bytearray()
for pather, atc in attachments:
ked = pather.resolve(embeds)
sadder = Sadder(ked=ked, kind=Kinds.json)
pevt.extend(sadder.raw)
pevt.extend(atc)
if not pevt:
print("error with message, nothing to forward", serder.ked)
return
self.mbx.storeMsg(topic=resource, msg=pevt)
[docs]
def introduce(hab, wit):
""" Clone and return hab KEL if lastest event has not been receipted by wit
Check to see if the target witness has already provided a receipt for the latest event
for the identifier of hab, clone the KEL and return it as a bytearray so it can be sent to
the target.
Parameters:
hab (Hab): local environment for the identifier to propagate
wit (str): qb64 identifier prefix of the receiver of KEL if not already receipted
Returns:
bytearray: cloned KEL of hab
"""
msgs = bytearray()
if wit in hab.kever.wits:
return msgs
iserder = hab.kever.serder
witPrefixer = Prefixer(qb64=wit)
dgkey = dgKey(wit, iserder.said)
found = False
if witPrefixer.transferable: # find if have rct from other pre for own icp
for sprefixer, snum, sdiger, siger in hab.db.vrcs.getIter(dgkey):
# Receipt is from this hab if the prefix matches
if sprefixer.qb64 == hab.pre:
found = True
else: # find if already rcts of own icp
for prefixer, cigar in hab.db.rcts.getIter(dgkey):
if prefixer.qb64.startswith(hab.pre):
found = True # yes so don't send own inception
if not found: # no receipt from remote so send own inception
# no vrcs or rct of own icp from remote so send own inception
for msg in hab.db.clonePreIter(pre=hab.pre):
msgs.extend(msg)
for msg in hab.db.cloneDelegation(hab.kever):
msgs.extend(msg)
msgs.extend(hab.replyEndRole(cid=hab.pre))
return msgs