Source code for keri.app.storing

# -*- encoding: utf-8 -*-
"""
keri.app.storing module

"""

from hio.base import doing
from hio.help import decking
from ordered_set import OrderedSet as oset

from . import forwarding
from .. import help
from ..core import coring, serdering
from ..core.coring import MtrDex
from ..db import dbing, subing

logger = help.ogler.getLogger()


[docs] class Mailboxer(dbing.LMDBer): """ Mailboxer stores exn messages in order and provider iterator access at an index. """ TailDirPath = "keri/mbx" AltTailDirPath = ".keri/mbx" TempPrefix = "keri_mbx_"
[docs] def __init__(self, name="mbx", headDirPath=None, reopen=True, **kwa): """ Parameters: headDirPath: perm: reopen: kwa: """ self.tpcs = None self.msgs = None super(Mailboxer, self).__init__(name=name, headDirPath=headDirPath, reopen=reopen, **kwa)
[docs] def reopen(self, **kwa): """ :param kwa: :return: """ super(Mailboxer, self).reopen(**kwa) self.tpcs = self.env.open_db(key=b'tpcs.', dupsort=True) self.msgs = subing.Suber(db=self, subkey='msgs.') # key states return self.env
[docs] def delTopic(self, key): """ Use snKey() Deletes value at key. Returns True If key exists in database Else False """ return self.delIoSetVals(self.tpcs, key)
[docs] def appendToTopic(self, topic, val): """ Return first seen order number int, fn, of appended entry. Computes fn as next fn after last entry. Append val to end of db entries with same topic but with fn incremented by 1 relative to last preexisting entry at pre. Parameters: topic is bytes identifier prefix/topic for message val is event digest """ return self.appendIoSetVal(db=self.tpcs, key=topic, val=val)
[docs] def getTopicMsgs(self, topic, fn=0): """ Returns: ioset (oset): the insertion ordered set of values at same apparent effective key. Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Parameters: topic (Option(bytes|str)): Apparent effective key fn (int) starting index """ if hasattr(topic, "encode"): topic = topic.encode("utf-8") digs = self.getIoSetVals(db=self.tpcs, key=topic, ion=fn) msgs = [] for dig in digs: if msg := self.msgs.get(keys=dig): msgs.append(msg.encode("utf-8")) return msgs
[docs] def storeMsg(self, topic, msg): """ Add exn event to mailbox of dest identifier Parameters: msg (bytes): topic (qb64b): """ if hasattr(topic, "encode"): topic = topic.encode("utf-8") if hasattr(msg, "encode"): msg = msg.encode("utf-8") digb = coring.Diger(ser=msg, code=MtrDex.Blake3_256).qb64b self.appendToTopic(topic=topic, val=digb) return self.msgs.pin(keys=digb, val=msg)
[docs] def cloneTopicIter(self, topic, fn=0): """ Returns iterator of first seen exn messages with attachments for the identifier prefix pre starting at first seen order number, fn. """ if hasattr(topic, 'encode'): topic = topic.encode("utf-8") for (key, dig) in self.getIoSetItemsIter(self.tpcs, key=topic, ion=fn): topic, ion = dbing.unsuffix(key) if msg := self.msgs.get(keys=dig): yield ion, topic, msg.encode("utf-8")
[docs] class Respondant(doing.DoDoer): """ Respondant processes buffer of response messages from inbound 'exn' messages and routes them to the appropriate mailbox. If destination has witnesses, send response to one of the (randomly selected) witnesses. Otherwise store the response in the recipients mailbox locally. """
[docs] def __init__(self, hby, reps=None, cues=None, mbx=None, aids=None, **kwa): """ Creates Respondant that uses local environment to find the destination KEL and stores peer to peer messages in mbx, the mailboxer Parameters: hab (Habitat): local environment mbx (Mailboxer): storage for local messages """ self.reps = reps if reps is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() self.hby = hby self.aids = aids self.mbx = mbx if mbx is not None else Mailboxer(name=self.hby.name) self.postman = forwarding.Poster(hby=self.hby, mbx=self.mbx) doers = [self.postman, doing.doify(self.responseDo), doing.doify(self.cueDo)] super(Respondant, self).__init__(doers=doers, **kwa)
[docs] def responseDo(self, tymth=None, tock=0.0): """ Doifiable Doist compatibile generator method to process response messages from `exn` handlers. If dest is not in local environment, ignore the response (for now). If dest has witnesses, pick one at random and send the response to that witness for storage in the recipients mailbox on that witness. Otherwise this is a peer to peer HTTP message and should be stored in a mailbox locally for the recipient. Usage: add result of doify on this method to doers list """ # enter context self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.reps: rep = self.reps.popleft() sender = rep["src"] recipient = rep["dest"] exn = rep["rep"] topic = rep["topic"] if recipient not in self.hby.kevers: logger.error("unable to reply, dest {} not found".format(recipient)) continue senderHab = self.hby.habs[sender] if senderHab.mhab: forwardHab = senderHab.mhab else: forwardHab = senderHab # sign the exn to get the signature eattach = senderHab.endorse(exn, last=False, pipelined=False) del eattach[:exn.size] self.postman.send(recipient, topic=topic, serder=exn, hab=forwardHab, attachment=eattach) yield self.tock # throttle just do one cue at a time yield self.tock
[docs] def cueDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process Kevery and Tevery cues deque Usage: add result of doify on this method to doers list """ # enter context self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.cues: # iteratively process each cue in cues cue = self.cues.pull() # self.cues.popleft() cueKin = cue["kin"] # type or kind of cue if cueKin in ("receipt",): # cue to receipt a received event from other pre serder = cue["serder"] # Serder of received event for other pre cuedKed = serder.ked cuedPrefixer = coring.Prefixer(qb64=cuedKed["i"]) # If respondant configured with list of acceptable AIDs to witness for, check them here if self.aids is not None and cuedPrefixer.qb64 not in self.aids: continue if cuedPrefixer.qb64 in self.hby.kevers: kever = self.hby.kevers[cuedPrefixer.qb64] owits = oset(kever.wits) if match := owits.intersection(self.hby.prefixes): pre = match.pop() hab = self.hby.habByPre(pre) if hab is None: continue raw = hab.receipt(serder) rserder = serdering.SerderKERI(raw=raw) del raw[:rserder.size] self.postman.send(serder.pre, topic="receipt", serder=rserder, hab=hab, attachment=raw) elif cueKin in ("replay",): src = cue["src"] dest = cue["dest"] msgs = cue["msgs"] hab = self.hby.habByPre(src) if hab is None: continue if dest not in self.hby.kevers: continue for msg in msgs: raw = bytearray(msg) serder = serdering.SerderKERI(raw=raw) del raw[:serder.size] self.postman.send(dest, topic="replay", serder=serder, hab=hab, attachment=raw) elif cueKin in ("reply",): src = cue["src"] serder = cue["serder"] dest = cue["dest"] if dest not in self.hby.kevers: continue hab = self.hby.habByPre(src) if hab is None: continue atc = hab.endorse(serder) del atc[:serder.size] self.postman.send(hab=hab, dest=dest, topic="reply", serder=serder, attachment=atc) yield self.tock