Source code for keri.app.indirecting

# -*- encoding: utf-8 -*-
"""
KERI
keri.app.indirecting module

simple indirect mode demo support classes
"""
import datetime

import falcon
import time
import sys
import traceback
from ordered_set import OrderedSet as oset

from hio.base import doing
from hio.core import http, tcp
from hio.core.tcp import serving
from hio.help import decking

import keri.app.oobiing
from . import directing, storing, httping, forwarding, agenting, oobiing
from .habbing import GroupHab
from .. import help, kering
from ..core import eventing, parsing, routing, coring, serdering
from ..core.coring import Ilks
from ..db import basing, dbing
from ..end import ending
from ..help import helping
from ..peer import exchanging
from ..vdr import verifying, viring
from ..vdr.eventing import Tevery

logger = help.ogler.getLogger()


[docs] def setupWitness(hby, alias="witness", mbx=None, aids=None, tcpPort=5631, httpPort=5632, keypath=None, certpath=None, cafilepath=None): """ Setup witness controller and doers """ cues = decking.Deck() doers = [] # make hab hab = hby.habByName(name=alias) if hab is None: hab = hby.makeHab(name=alias, transferable=False) reger = viring.Reger(name=hab.name, db=hab.db, temp=False) verfer = verifying.Verifier(hby=hby, reger=reger) mbx = mbx if mbx is not None else storing.Mailboxer(name=alias, temp=hby.temp) forwarder = forwarding.ForwardHandler(hby=hby, mbx=mbx) exchanger = exchanging.Exchanger(hby=hby, handlers=[forwarder]) clienter = httping.Clienter() oobiery = keri.app.oobiing.Oobiery(hby=hby, clienter=clienter) app = falcon.App(cors_enable=True) ending.loadEnds(app=app, hby=hby, default=hab.pre) oobiing.loadEnds(app=app, hby=hby, prefix="/ext") rep = storing.Respondant(hby=hby, mbx=mbx, aids=aids) rvy = routing.Revery(db=hby.db, cues=cues) kvy = eventing.Kevery(db=hby.db, lax=True, local=False, rvy=rvy, cues=cues) kvy.registerReplyRoutes(router=rvy.rtr) tvy = Tevery(reger=verfer.reger, db=hby.db, local=False, cues=cues) tvy.registerReplyRoutes(router=rvy.rtr) parser = parsing.Parser(framed=True, kvy=kvy, tvy=tvy, exc=exchanger, rvy=rvy) httpEnd = HttpEnd(rxbs=parser.ims, mbx=mbx) app.add_route("/", httpEnd) receiptEnd = ReceiptEnd(hab=hab, inbound=cues, aids=aids) app.add_route("/receipts", receiptEnd) server = createHttpServer(httpPort, app, keypath, certpath, cafilepath) if not server.reopen(): raise RuntimeError(f"cannot create http server on port {httpPort}") httpServerDoer = http.ServerDoer(server=server) # setup doers regDoer = basing.BaserDoer(baser=verfer.reger) if tcpPort is not None: server = serving.Server(host="", port=tcpPort) if not server.reopen(): raise RuntimeError(f"cannot create tcp server on port {tcpPort}") serverDoer = serving.ServerDoer(server=server) directant = directing.Directant(hab=hab, server=server, verifier=verfer) doers.extend([directant, serverDoer]) witStart = WitnessStart(hab=hab, parser=parser, cues=receiptEnd.outbound, kvy=kvy, tvy=tvy, rvy=rvy, exc=exchanger, replies=rep.reps, responses=rep.cues, queries=httpEnd.qrycues) doers.extend([regDoer, httpServerDoer, rep, witStart, receiptEnd, *oobiery.doers]) return doers
[docs] def createHttpServer(port, app, keypath=None, certpath=None, cafilepath=None): """ Create an HTTP or HTTPS server depending on whether TLS key material is present Parameters: port (int) : port to listen on for all HTTP(s) server instances app (falcon.App) : application instance to pass to the http.Server instance keypath (string) : the file path to the TLS private key certpath (string) : the file path to the TLS signed certificate (public key) cafilepath (string): the file path to the TLS CA certificate chain file Returns: hio.core.http.Server """ if keypath is not None and certpath is not None and cafilepath is not None: servant = tcp.ServerTls(certify=False, keypath=keypath, certpath=certpath, cafilepath=cafilepath, port=port) server = http.Server(port=port, app=app, servant=servant) else: server = http.Server(port=port, app=app) return server
[docs] class WitnessStart(doing.DoDoer): """ Doer to print witness prefix after initialization """ def __init__(self, hab, parser, kvy, tvy, rvy, exc, cues=None, replies=None, responses=None, queries=None, **opts): self.hab = hab self.parser = parser self.kvy = kvy self.tvy = tvy self.rvy = rvy self.exc = exc self.queries = queries if queries is not None else decking.Deck() self.replies = replies if replies is not None else decking.Deck() self.responses = responses if responses is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() doers = [doing.doify(self.start), doing.doify(self.msgDo), doing.doify(self.escrowDo), doing.doify(self.cueDo)] super().__init__(doers=doers, **opts)
[docs] def start(self, tymth=None, tock=0.0): """ Prints witness name and prefix Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while not self.hab.inited: yield self.tock print("Witness", self.hab.name, ":", self.hab.pre)
[docs] def msgDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process incoming message stream of .kevery Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) if self.parser.ims: logger.info("Client %s received:\n%s\n...\n", self.kvy, self.parser.ims[:1024]) done = yield from self.parser.parsator() # process messages continuously return done # should nover get here except forced close
[docs] def escrowDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery and .tevery escrows. Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: self.kvy.processEscrows() self.rvy.processEscrowReply() if self.tvy is not None: self.tvy.processEscrows() self.exc.processEscrow() yield
[docs] def cueDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery.cues deque Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.cues: cue = self.cues.pull() # self.cues.popleft() cueKin = cue["kin"] if cueKin == "stream": self.queries.append(cue) else: self.responses.append(cue) yield self.tock yield self.tock
[docs] class Indirector(doing.DoDoer): """ Base class for Indirect Mode KERI Controller Doer with habitat and TCP Clients for talking to witnesses Subclass of DoDoer with doers list from do generator methods: .msgDo, .cueDo, and .escrowDo. Enables continuous scheduling of doers (do generator instances or functions) Implements Doist like functionality to allow nested scheduling of doers. Each DoDoer runs a list of doers like a Doist but using the tyme from its injected tymist as injected by its parent DoDoer or Doist. Scheduling hierarchy: Doist->DoDoer...->DoDoer->Doers Attributes: .done is Boolean completion state: True means completed Otherwise incomplete. Incompletion maybe due to close or abort. .opts is dict of injected options for its generator .do .doers is list of Doers or Doer like generator functions Attributes: hab (Habitat: local controller's context client (serving.Client): hio TCP client instance. Assumes operated by another doer. Properties: tyme (float): relative cycle time of associated Tymist, obtained via injected .tymth function wrapper closure. tymth (function): function wrapper closure returned by Tymist .tymeth() method. When .tymth is called it returns associated Tymist .tyme. .tymth provides injected dependency on Tymist tyme base. tock (float): desired time in seconds between runs or until next run, non negative, zero means run asap Methods: .wind injects ._tymth dependency from associated Tymist to get its .tyme .__call__ makes instance callable Appears as generator function that returns generator .do is generator method that returns generator .enter is enter context action method .recur is recur context action method or generator method .clean is clean context action method .exit is exit context method .close is close context method .abort is abort context method Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, hab, client, direct=True, doers=None, **kwa): """ Initialize instance. Inherited Parameters: tymist is Tymist instance tock is float seconds initial value of .tock doers is list of doers (do generator instances, functions or methods) Parameters: hab is Habitat instance of local controller's context client is TCP Client instance direct is Boolean, True means direwct mode process cured receipts False means indirect mode don't process cue'ed receipts """ self.hab = hab self.client = client # use client for both rx and tx self.direct = True if direct else False self.kevery = eventing.Kevery(db=self.hab.db, lax=False, local=False, cloned=not self.direct, direct=self.direct) self.parser = parsing.Parser(ims=self.client.rxbs, framed=True, kvy=self.kevery) doers = doers if doers is not None else [] doers.extend([doing.doify(self.msgDo), doing.doify(self.escrowDo)]) if self.direct: doers.extend([doing.doify(self.cueDo)]) super(Indirector, self).__init__(doers=doers, **kwa) if self.tymth: self.client.wind(self.tymth)
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(Indirector, self).wind(tymth) self.client.wind(tymth)
[docs] def msgDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process incoming message stream of .kevery Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) if self.parser.ims: logger.info("Client %s received:\n%s\n...\n", self.hab.pre, self.parser.ims[:1024]) done = yield from self.parser.parsator() # process messages continuously return done # should nover get here except forced close
[docs] def cueDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery.cues deque Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: for msg in self.hab.processCuesIter(self.kevery.cues): self.sendMessage(msg, label="chit or receipt") yield # throttle just do one cue at a time yield
[docs] def escrowDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery escrows. Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: self.kevery.processEscrows() yield
[docs] def sendMessage(self, msg, label=""): """ Sends message msg and loggers label if any """ self.client.tx(msg) # send to remote logger.info("%s sent %s:\n%s\n\n", self.hab.pre, label, bytes(msg))
[docs] class MailboxDirector(doing.DoDoer): """ Class for Indirect Mode KERI Controller Doer with habitat and TCP Clients for talking to witnesses Subclass of DoDoer with doers list from do generator methods: .msgDo, .cueDo, and .escrowDo. Enables continuous scheduling of doers (do generator instances or functions) Implements Doist like functionality to allow nested scheduling of doers. Each DoDoer runs a list of doers like a Doist but using the tyme from its injected tymist as injected by its parent DoDoer or Doist. Scheduling hierarchy: Doist->DoDoer...->DoDoer->Doers Attributes: .done is Boolean completion state: True means completed Otherwise incomplete. Incompletion maybe due to close or abort. .opts is dict of injected options for its generator .do .doers is list of Doers or Doer like generator functions Attributes: hby (Habitat: local controller's context Properties: tyme (float): relative cycle time of associated Tymist, obtained via injected .tymth function wrapper closure. tymth (function): function wrapper closure returned by Tymist .tymeth() method. When .tymth is called it returns associated Tymist .tyme. .tymth provides injected dependency on Tymist tyme base. tock (float): desired time in seconds between runs or until next run, non negative, zero means run asap Methods: .wind injects ._tymth dependency from associated Tymist to get its .tyme .__call__ makes instance callable Appears as generator function that returns generator .do is generator method that returns generator .enter is enter context action method .recur is recur context action method or generator method .clean is clean context action method .exit is exit context method .close is close context method .abort is abort context method Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, hby, topics, ims=None, verifier=None, kvy=None, exc=None, rep=None, cues=None, rvy=None, tvy=None, **kwa): """ Initialize instance. Inherited Parameters: tymist is Tymist instance tock is float seconds initial value of .tock doers is list of doers (do generator instances, functions or methods) Parameters: hab is Habitat instance of local controller's context client is TCP Client instance direct is Boolean, True means direwct mode process cured receipts False means indirect mode don't process cue'ed receipts """ self.hby = hby self.verifier = verifier self.exchanger = exc self.rep = rep self.topics = topics self.pollers = list() self.prefixes = oset() self.cues = cues if cues is not None else decking.Deck() self.ims = ims if ims is not None else bytearray() doers = [] doers.extend([doing.doify(self.pollDo), doing.doify(self.msgDo), doing.doify(self.escrowDo)]) self.rtr = routing.Router() self.rvy = rvy if rvy is not None else routing.Revery(db=self.hby.db, rtr=self.rtr, cues=cues, lax=True, local=False) # needs unique kevery with ims per remoter connnection self.kvy = kvy if kvy is not None else eventing.Kevery(db=self.hby.db, cues=self.cues, rvy=self.rvy, lax=True, local=False, direct=False) self.kvy.registerReplyRoutes(self.rtr) if self.verifier is not None: self.tvy = tvy if tvy is not None else Tevery(reger=self.verifier.reger, db=self.hby.db, rvy=self.rvy, lax=True, local=False, cues=self.cues) self.tvy.registerReplyRoutes(self.rtr) else: self.tvy = None self.parser = parsing.Parser(ims=self.ims, framed=True, kvy=self.kvy, tvy=self.tvy, exc=self.exchanger, rvy=self.rvy, vry=self.verifier) super(MailboxDirector, self).__init__(doers=doers, **kwa)
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(MailboxDirector, self).wind(tymth)
[docs] def pollDo(self, tymth=None, tock=0.0): """ Returns: doifiable Doist compatible generator method Usage: add result of doify on this method to doers list """ # enter context self.wind(tymth) self.tock = tock _ = (yield self.tock) habs = list(self.hby.habs.values()) for hab in habs: if hab.accepted: self.addPollers(hab) _ = (yield self.tock) while True: pres = oset(self.hby.habs.keys()) if new := pres - self.prefixes: for pre in new: hab = self.hby.habs[pre] if hab.accepted: self.addPollers(hab=hab) _ = (yield self.tock) for msg in self.processPollIter(): self.ims.extend(msg) _ = (yield self.tock) _ = (yield self.tock)
[docs] def addPollers(self, hab): """ add mailbox pollers for every witness for this prefix identifier Parameters: hab (Hab): the Hab of the prefix """ for (_, erole, eid), end in hab.db.ends.getItemIter(keys=(hab.pre, kering.Roles.mailbox)): if end.allowed: poller = Poller(hab=hab, topics=self.topics, witness=eid) self.pollers.append(poller) self.extend([poller]) wits = hab.kever.wits for wit in wits: poller = Poller(hab=hab, topics=self.topics, witness=wit) self.pollers.append(poller) self.extend([poller]) self.prefixes.add(hab.pre)
def addPoller(self, hab, witness): poller = Poller(hab=hab, topics=self.topics, witness=witness) self.pollers.append(poller) self.extend([poller])
[docs] def processPollIter(self): """ Iterate through cues and yields one or more responses for each cue. Parameters: cues is deque of cues """ mail = [] for poller in self.pollers: # get responses from all behaviors while poller.msgs: msg = poller.msgs.popleft() mail.append(msg) while mail: # iteratively process each response in responses msg = mail.pop(0) yield msg
[docs] def msgDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process incoming message stream of .kevery Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) done = yield from self.parser.parsator() # process messages continuously return done # should nover get here except forced close
[docs] def escrowDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery escrows. Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: self.kvy.processEscrows() self.rvy.processEscrowReply() if self.exchanger is not None: self.exchanger.processEscrow() if self.tvy is not None: self.tvy.processEscrows() if self.verifier is not None: self.verifier.processEscrows() yield
@property def times(self): times = dict() for poller in self.pollers: # get responses from all pollers times |= poller.times return times
[docs] class Poller(doing.DoDoer): """ Polls remote SSE endpoint for event that are KERI messages to be processed """
[docs] def __init__(self, hab, witness, topics, msgs=None, retry=1000, **kwa): """ Returns doist compatible doing.Doer that polls a witness for mailbox messages as SSE events Parameters: hab: witness: topics: msgs: """ self.hab = hab self.pre = hab.pre self.witness = witness self.topics = topics self.retry = retry self.msgs = None if msgs is not None else decking.Deck() self.times = dict() doers = [doing.doify(self.eventDo)] super(Poller, self).__init__(doers=doers, **kwa)
[docs] def eventDo(self, tymth=None, tock=0.0): """ Returns: doifiable Doist compatible generator method Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) witrec = self.hab.db.tops.get((self.pre, self.witness)) if witrec is None: witrec = basing.TopicsRecord(topics=dict()) while self.retry > 0: try: client, clientDoer = agenting.httpClient(self.hab, self.witness) except kering.MissingEntryError as e: traceback.print_exception(e, file=sys.stderr) # logging yield self.tock continue self.extend([clientDoer]) topics = dict() q = dict(pre=self.pre, topics=topics) for topic in self.topics: if topic in witrec.topics: topics[topic] = witrec.topics[topic] + 1 else: topics[topic] = 0 if isinstance(self.hab, GroupHab): msg = self.hab.mhab.query(pre=self.pre, src=self.witness, route="mbx", query=q) else: msg = self.hab.query(pre=self.pre, src=self.witness, route="mbx", query=q) httping.createCESRRequest(msg, client, dest=self.witness) while client.requests: yield self.tock created = helping.nowUTC() while True: now = helping.nowUTC() if now - created > datetime.timedelta(seconds=30): self.remove([clientDoer]) break while client.events: evt = client.events.popleft() if "retry" in evt: self.retry = evt["retry"] if "id" not in evt or "data" not in evt or "name" not in evt: logger.error(f"bad mailbox event: {evt}") continue idx = evt["id"] msg = evt["data"] tpc = evt["name"] if not idx or not msg or not tpc: logger.error(f"bad mailbox event: {evt}") continue self.msgs.append(msg.encode("utf=8")) yield self.tock witrec.topics[tpc] = int(idx) self.times[tpc] = helping.nowUTC() self.hab.db.tops.pin((self.pre, self.witness), witrec) yield 0.25 yield self.retry / 1000
[docs] class HttpEnd: """ HTTP handler that accepts and KERI events POSTed as the body of a request with all attachments to the message as a CESR attachment HTTP header. KEL Messages are processed and added to the database of the provided Habitat. This also handles `req`, `exn` and `tel` messages that respond with a KEL replay. """ TimeoutQNF = 30 TimeoutMBX = 5
[docs] def __init__(self, rxbs=None, mbx=None, qrycues=None): """ Create the KEL HTTP server from the Habitat with an optional Falcon App to register the routes with. Parameters rxbs (bytearray): output queue of bytes for message processing mbx (Mailboxer): Mailbox storage qrycues (Deck): inbound qry response queues """ self.rxbs = rxbs if rxbs is not None else bytearray() self.mbx = mbx self.qrycues = qrycues if qrycues is not None else decking.Deck()
[docs] def on_post(self, req, rep): """ Handles POST for KERI event messages. Parameters: req (Request) Falcon HTTP request rep (Response) Falcon HTTP response --- summary: Accept KERI events with attachment headers and parse description: Accept KERI events with attachment headers and parse. tags: - Events requestBody: required: true content: application/json: schema: type: object description: KERI event message responses: 200: description: Mailbox query response for server sent events 204: description: KEL or EXN event accepted. """ if req.method == "OPTIONS": rep.status = falcon.HTTP_200 return rep.set_header('Cache-Control', "no-cache") rep.set_header('connection', "close") cr = httping.parseCesrHttpRequest(req=req) sadder = coring.Sadder(ked=cr.payload, kind=eventing.Serials.json) msg = bytearray(sadder.raw) msg.extend(cr.attachments.encode("utf-8")) self.rxbs.extend(msg) if sadder.proto in ("ACDC",): rep.set_header('Content-Type', "application/json") rep.status = falcon.HTTP_204 else: ilk = sadder.ked["t"] if ilk in (Ilks.icp, Ilks.rot, Ilks.ixn, Ilks.dip, Ilks.drt, Ilks.exn, Ilks.rpy): rep.set_header('Content-Type', "application/json") rep.status = falcon.HTTP_204 elif ilk in (Ilks.vcp, Ilks.vrt, Ilks.iss, Ilks.rev, Ilks.bis, Ilks.brv): rep.set_header('Content-Type', "application/json") rep.status = falcon.HTTP_204 elif ilk in (Ilks.qry,): if sadder.ked["r"] in ("mbx",): rep.set_header('Content-Type', "text/event-stream") rep.status = falcon.HTTP_200 rep.stream = QryRpyMailboxIterable(mbx=self.mbx, cues=self.qrycues, said=sadder.said) else: rep.set_header('Content-Type', "application/json") rep.status = falcon.HTTP_204
[docs] def on_put(self, req, rep): """ Handles PUT for KERI mbx event messages. Parameters: req (Request) Falcon HTTP request rep (Response) Falcon HTTP response --- summary: Accept KERI events with attachment headers and parse description: Accept KERI events with attachment headers and parse. tags: - Events requestBody: required: true content: application/json: schema: type: object description: KERI event message responses: 200: description: Mailbox query response for server sent events 204: description: KEL or EXN event accepted. """ if req.method == "OPTIONS": rep.status = falcon.HTTP_200 return rep.set_header('Cache-Control', "no-cache") rep.set_header('connection', "close") self.rxbs.extend(req.bounded_stream.read()) rep.set_header('Content-Type', "application/json") rep.status = falcon.HTTP_204
class QryRpyMailboxIterable: def __init__(self, cues, mbx, said, retry=5000): self.mbx = mbx self.retry = retry self.cues = cues self.said = said self.iter = None def __iter__(self): return self def __next__(self): if self.iter is None: if self.cues: cue = self.cues.pull() # self.cues.popleft() serder = cue["serder"] if serder.said == self.said: kin = cue["kin"] if kin == "stream": self.iter = iter(MailboxIterable(mbx=self.mbx, pre=cue["pre"], topics=cue["topics"], retry=self.retry)) else: self.cues.append(cue) return b'' return next(self.iter) class MailboxIterable: TimeoutMBX = 30000000 def __init__(self, mbx, pre, topics, retry=5000): self.mbx = mbx self.pre = pre self.topics = topics self.retry = retry def __iter__(self): self.start = self.end = time.perf_counter() return self def __next__(self): if self.end - self.start < self.TimeoutMBX: if self.start == self.end: self.end = time.perf_counter() return bytearray(f"retry: {self.retry}\n\n".encode("utf-8")) data = bytearray() for topic, idx in self.topics.items(): key = self.pre + topic for fn, _, msg in self.mbx.cloneTopicIter(key, idx): data.extend(bytearray("id: {}\nevent: {}\nretry: {}\ndata: ".format(fn, topic, self.retry) .encode("utf-8"))) data.extend(msg) data.extend(b'\n\n') idx = idx + 1 self.start = time.perf_counter() self.topics[topic] = idx self.end = time.perf_counter() return data raise StopIteration
[docs] class ReceiptEnd(doing.DoDoer): """ Endpoint class for Witnessing receipting functionality Most times a witness will be able to return its receipt for an event inband. This API will provide that functionality. When an event needs to be escrowed, this POST API will return a 202 and also provides a generic GET API for retrieving a receipt for any event. """ def __init__(self, hab, inbound=None, outbound=None, aids=None): self.hab = hab self.inbound = inbound if inbound is not None else decking.Deck() self.outbound = outbound if outbound is not None else decking.Deck() self.aids = aids self.receipts = set() self.psr = parsing.Parser(framed=True, kvy=self.hab.kvy) super(ReceiptEnd, self).__init__(doers=[doing.doify(self.interceptDo)])
[docs] def on_post(self, req, rep): """ Receipt POST endpoint handler Parameters: req (Request): Falcon HTTP request object rep (Response): Falcon HTTP response object """ if req.method == "OPTIONS": rep.status = falcon.HTTP_200 return rep.set_header('Cache-Control', "no-cache") rep.set_header('connection', "close") cr = httping.parseCesrHttpRequest(req=req) serder = serdering.SerderKERI(sad=cr.payload, kind=eventing.Serials.json) pre = serder.ked["i"] if self.aids is not None and pre not in self.aids: raise falcon.HTTPBadRequest(description=f"invalid AID={pre} for witnessing receipting") ilk = serder.ked["t"] if ilk not in (Ilks.icp, Ilks.rot, Ilks.ixn, Ilks.dip, Ilks.drt): raise falcon.HTTPBadRequest(description=f"invalid event type ({ilk})for receipting") msg = bytearray(serder.raw) msg.extend(cr.attachments.encode("utf-8")) self.psr.parseOne(ims=msg) if pre in self.hab.kevers: kever = self.hab.kevers[pre] wits = kever.wits if self.hab.pre not in wits: raise falcon.HTTPBadRequest(description=f"{self.hab.pre} is not a valid witness for {pre} event at " f"{serder.sn}: wits={wits}") rct = self.hab.receipt(serder) self.psr.parseOne(bytes(rct)) rep.set_header('Content-Type', "application/json+cesr") rep.status = falcon.HTTP_200 rep.data = rct else: rep.status = falcon.HTTP_202
[docs] def on_get(self, req, rep): """ Receipt GET endpoint handler Parameters: req (Request): Falcon HTTP request object rep (Response): Falcon HTTP response object """ pre = req.get_param("pre") sn = req.get_param_as_int("sn") said = req.get_param("said") if pre is None: raise falcon.HTTPBadRequest(description="query param 'pre' is required") preb = pre.encode("utf-8") if sn is None and said is None: raise falcon.HTTPBadRequest(description="either 'sn' or 'said' query param is required") if sn is not None: said = self.hab.db.getKeLast(key=dbing.snKey(pre=preb, sn=sn)) if said is None: raise falcon.HTTPNotFound(description=f"event for {pre} at {sn} ({said}) not found") said = bytes(said) dgkey = dbing.dgKey(preb, said) # get message if not (raw := self.hab.db.getEvt(key=dgkey)): raise falcon.HTTPNotFound(description="Missing event for dig={}.".format(said)) serder = serdering.SerderKERI(raw=bytes(raw)) if serder.sn > 0: wits = [wit.qb64 for wit in self.hab.kvy.fetchWitnessState(pre, serder.sn)] else: wits = serder.ked["b"] if self.hab.pre not in wits: raise falcon.HTTPBadRequest(description=f"{self.hab.pre} is not a valid witness for {pre} event at " f"{serder.sn}, {wits}") rserder = eventing.receipt(pre=pre, sn=sn, said=said.decode("utf-8")) rct = bytearray(rserder.raw) if wigs := self.hab.db.getWigs(key=dgkey): rct.extend(coring.Counter(code=coring.CtrDex.WitnessIdxSigs, count=len(wigs)).qb64b) for wig in wigs: rct.extend(wig) rep.set_header('Content-Type', "application/json+cesr") rep.status = falcon.HTTP_200 rep.data = rct
[docs] def interceptDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process Kevery and Tevery cues deque Usage: add result of doify on this method to doers list """ # enter context self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.inbound: # iteratively process each cue in cues cue = self.inbound.popleft() cueKin = cue["kin"] # type or kind of cue if cueKin in ("receipt",): # cue to receipt a received event from other pre serder = cue["serder"] # Serder of received event for other pre if serder.saidb in self.receipts: self.receipts.remove(serder.saidb) else: self.outbound.append(cue) else: self.outbound.append(cue) yield self.tock yield self.tock