Source code for keri.app.storing

# -*- encoding: utf-8 -*-
"""
keri.app.storing module

"""

from hio.base import doing
from hio.help import decking, ogler
from ordered_set import OrderedSet as oset

from .forwarding import Poster

from ..core import SerderKERI, MtrDex, Diger, Prefixer
from ..db import LMDBer, OnSuber, Suber

logger = ogler.getLogger()


[docs] class Mailboxer(LMDBer): """ Mailboxer stores exn messages in order and provider iterator access at an index. """ TailDirPath = "keri/mbx" AltTailDirPath = ".keri/mbx" TempPrefix = "keri_mbx_"
[docs] def __init__(self, name="mbx", headDirPath=None, reopen=True, **kwa): """ Parameters: headDirPath: perm: reopen: kwa: Mailboxer uses two dbs for mailbox messages these are .tpcs and .msgs. The message index is in .tpcs (topics). Each .tpcs index key consists of topic.on where topic is bytes identifier or prefix/topic for message and on is serialized ordinal number to orders the appearance of a topic message. Each .tpcs val is the digest of the message. The message itself is stored in .msgs where the key is the msg digest and the value is the serialized messag itself. Multiple messages can share the same topic but with a different ordinal. """ self.tpcs = None self.msgs = None super(Mailboxer, self).__init__(name=name, headDirPath=headDirPath, reopen=reopen, **kwa)
[docs] def reopen(self, **kwa): """ :param kwa: :return: """ super(Mailboxer, self).reopen(**kwa) self.tpcs = OnSuber(db=self, subkey='tpcs.') self.msgs = Suber(db=self, subkey='msgs.') # key states return self.env
[docs] def delTopic(self, key, on=0): """Removes topic index from .tpcs without deleting message from .msgs Returns: result (boo): True if full key consisting of key and serialized on exists in database so removed. False otherwise (not removed). """ return self.tpcs.rem(keys=key, on=on)
[docs] def appendToTopic(self, topic, val): """Appends val to end of db entries with same topic but with on incremented by 1 relative to last preexisting entry at topic. Returns: on (int): order number int, on, of appended entry. Computes on as next on after last entry. Parameters: topic (bytes): topic identifier for message val (bytes): msg digest """ return self.tpcs.append(key=topic, val=val)
[docs] def getTopicMsgs(self, topic, fn=0): """ Returns: msgs (Iterable[bytes]): belonging to topic indices with same topic but all on >= fn i.e. all topic.on beginning with fn Parameters: topic (Option(bytes|str)): key prefix combined with serialized on to form full actual key. When key is empty then retrieves whole database. fn (int): starting index ordinal number used with onKey(pre,on) to form key at at which to initiate retrieval """ msgs = [] for keys, on, dig in self.tpcs.getAllItemIter(keys=topic, on=fn): if msg := self.msgs.get(keys=dig): msgs.append(msg.encode()) # want bytes not str return msgs
[docs] def storeMsg(self, topic, msg): """ Add exn event to mailbox topic and on that is 1 greater than last msg at topic. Returns: result (bool): True if msg successfully stored and indexed at topic False otherwise Parameters: topic (str | bytes): topic (Option(bytes|str)): key prefix combined with serialized on to form full actual key. msg (bytes): serialized message """ if hasattr(msg, "encode"): msg = msg.encode("utf-8") digb = Diger(ser=msg, code=MtrDex.Blake3_256).qb64b on = self.tpcs.append(keys=topic, val=digb) return self.msgs.pin(keys=digb, val=msg)
[docs] def cloneTopicIter(self, topic, fn=0): """ Returns: triple (Iterator[(on, topic, msg)]): iterator of messages at topic beginning with ordinal fn. topic (Option(bytes|str)): key prefix combined with serialized on to form full actual key. When key is empty then retrieves whole database. fn (int): starting index ordinal number used with onKey(pre,on) to form key at at which to initiate retrieval ToDo looks like misuse of IoSet this should not be IoSet but simply Ordinal Numbered db. since should not be using hidden ion has not hidden. """ for keys, on, dig in self.tpcs.getAllItemIter(keys=topic, on=fn): if msg := self.msgs.get(keys=dig): yield (on, topic, msg.encode("utf-8"))
[docs] class Respondant(doing.DoDoer): """ Respondant processes buffer of response messages from inbound 'exn' messages and routes them to the appropriate mailbox. If destination has witnesses, send response to one of the (randomly selected) witnesses. Otherwise store the response in the recipients mailbox locally. """
[docs] def __init__(self, hby, reps=None, cues=None, mbx=None, aids=None, **kwa): """ Creates Respondant that uses local environment to find the destination KEL and stores peer to peer messages in mbx, the mailboxer Parameters: hab (Habitat): local environment mbx (Mailboxer): storage for local messages """ self.reps = reps if reps is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() self.hby = hby self.aids = aids self.mbx = mbx if mbx is not None else Mailboxer(name=self.hby.name) self.postman = Poster(hby=self.hby, mbx=self.mbx) doers = [self.postman, doing.doify(self.responseDo), doing.doify(self.cueDo)] super(Respondant, self).__init__(doers=doers, **kwa)
[docs] def responseDo(self, tymth=None, tock=0.0, **kwa): """ Doifiable Doist compatibile generator method to process response messages from `exn` handlers. If dest is not in local environment, ignore the response (for now). If dest has witnesses, pick one at random and send the response to that witness for storage in the recipients mailbox on that witness. Otherwise this is a peer to peer HTTP message and should be stored in a mailbox locally for the recipient. Usage: add result of doify on this method to doers list """ # enter context self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.reps: rep = self.reps.popleft() sender = rep["src"] recipient = rep["dest"] exn = rep["rep"] topic = rep["topic"] if recipient not in self.hby.kevers: logger.error("unable to reply, dest {} not found".format(recipient)) continue senderHab = self.hby.habs[sender] if senderHab.mhab: forwardHab = senderHab.mhab else: forwardHab = senderHab # sign the exn to get the signature eattach = senderHab.endorse(exn, last=False, framed=True) del eattach[:exn.size] logger.info("Sending exn on %s from %s to %s", topic, sender, recipient) logger.debug("xn body=\n%s\n", exn.pretty()) self.postman.send(recipient, topic=topic, serder=exn, hab=forwardHab, attachment=eattach) yield self.tock # throttle just do one cue at a time yield self.tock
[docs] def cueDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process Kevery and Tevery cues deque Usage: add result of doify on this method to doers list """ # enter context self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while not self.cues: yield self.tock cue = self.cues.pull() # self.cues.popleft() cueKin = cue["kin"] # type or kind of cue if cueKin in ("receipt",): # cue to receipt a received event from other pre serder = cue["serder"] # Serder of received event for other pre cuedKed = serder.ked cuedPrefixer = Prefixer(qb64=cuedKed["i"]) # If respondant configured with list of acceptable AIDs to witness for, check them here if self.aids is not None and cuedPrefixer.qb64 not in self.aids: continue if cuedPrefixer.qb64 in self.hby.kevers: kever = self.hby.kevers[cuedPrefixer.qb64] owits = oset(kever.wits) if match := owits.intersection(self.hby.prefixes): pre = match.pop() hab = self.hby.habByPre(pre) if hab is None: continue raw = hab.receipt(serder, framed=True) rserder = SerderKERI(raw=raw) del raw[:rserder.size] self.postman.send(serder.pre, topic="receipt", serder=rserder, hab=hab, attachment=raw) elif cueKin in ("replay",): src = cue["src"] dest = cue["dest"] msgs = cue["msgs"] hab = self.hby.habByPre(src) if hab is None: continue if dest not in self.hby.kevers: continue for msg in msgs: raw = bytearray(msg) serder = SerderKERI(raw=raw) del raw[:serder.size] self.postman.send(dest, topic="replay", serder=serder, hab=hab, attachment=raw) elif cueKin in ("reply",): src = cue["src"] serder = cue["serder"] dest = cue["dest"] if dest not in self.hby.kevers: continue hab = self.hby.habByPre(src) if hab is None: continue atc = hab.endorse(serder, framed=False) del atc[:serder.size] self.postman.send(hab=hab, dest=dest, topic="reply", serder=serder, attachment=atc) else: self.cues.push(cue) yield self.tock