Source code for keri.app.indirecting

# -*- encoding: utf-8 -*-
"""
KERI
keri.app.indirecting module

simple indirect mode demo support classes
"""
import datetime
import platform
import falcon
import time
import sys
import traceback
from ordered_set import OrderedSet as oset

from hio.base import doing
from hio.core import http, tcp
from hio.core.tcp import serving
from hio.help import decking, ogler

from ..kering import (Vrsn_1_0, Roles, Ilks, Kinds,
                      MissingEntryError)
from ..recording import TopicsRecord
from ..core import (Kevery, parsing, routing, coring, serdering,
                    Counter, receipt, Codens)
from ..db import BaserDoer
from ..end import loadEnds as loadEndingEnds
from ..help import nowUTC
from ..peer import Exchanger

from .habbing import GroupHab
from .directing import Directant
from .storing import Mailboxer, Respondant
from .httping import Clienter, createCESRRequest, parseCesrHttpRequest, CESR_CONTENT_TYPE
from .forwarding import ForwardHandler
from .agenting import httpClient
from .oobiing import Oobiery, loadEnds as loadOobiingEnds

logger = ogler.getLogger()


[docs] def setupWitness(hby, alias="witness", mbx=None, aids=None, tcpPort=5631, httpPort=5632, keypath=None, certpath=None, cafilepath=None, **kwa): """ Setup witness controller and doers """ host = "0.0.0.0" if platform.system() == "Windows": host = "127.0.0.1" cues = decking.Deck() doers = [] # make hab hab = hby.habByName(name=alias) if hab is None: hab = hby.makeHab(name=alias, transferable=False, **kwa) from ..vdr import Reger,Verifier # dynamic import because of circular import reger = Reger(name=hab.name, db=hab.db, temp=False) verfer = Verifier(hby=hby, reger=reger) mbx = mbx if mbx is not None else Mailboxer(name=alias, temp=hby.temp) forwarder = ForwardHandler(hby=hby, mbx=mbx) exchanger = Exchanger(hby=hby, handlers=[forwarder]) clienter = Clienter() oobiery = Oobiery(hby=hby, clienter=clienter) app = falcon.App(cors_enable=True) loadEndingEnds(app=app, hby=hby, default=hab.pre) loadOobiingEnds(app=app, hby=hby, prefix="/ext") rep = Respondant(hby=hby, mbx=mbx, aids=aids) rvy = routing.Revery(db=hby.db, cues=cues) kvy = Kevery(db=hby.db, lax=True, local=False, rvy=rvy, cues=cues) kvy.registerReplyRoutes(router=rvy.rtr) from ..vdr import Tevery # dynamic import because of circular import tvy = Tevery(reger=verfer.reger, db=hby.db, local=False, cues=cues) tvy.registerReplyRoutes(router=rvy.rtr) parser = parsing.Parser(framed=True, kvy=kvy, tvy=tvy, exc=exchanger, rvy=rvy, version=Vrsn_1_0) httpEnd = HttpEnd(rxbs=parser.ims, mbx=mbx) app.add_route("/", httpEnd) receiptEnd = ReceiptEnd(hab=hab, inbound=cues, aids=aids) app.add_route("/receipts", receiptEnd) queryEnd = QueryEnd(hab=hab, reger=reger) app.add_route("/query", queryEnd) server = createHttpServer(host, httpPort, app, keypath, certpath, cafilepath) if not server.reopen(): raise RuntimeError(f"cannot create http server on port {httpPort}") httpServerDoer = http.ServerDoer(server=server) # setup doers regDoer = BaserDoer(baser=reger) if tcpPort is not None: server = serving.Server(host="", port=tcpPort) if not server.reopen(): raise RuntimeError(f"cannot create tcp server on port {tcpPort}") serverDoer = serving.ServerDoer(server=server) directant = Directant(hab=hab, server=server, verifier=verfer) doers.extend([directant, serverDoer]) witStart = WitnessStart(hab=hab, parser=parser, cues=receiptEnd.outbound, kvy=kvy, tvy=tvy, rvy=rvy, exc=exchanger, replies=rep.reps, responses=rep.cues, queries=httpEnd.qrycues) doers.extend([regDoer, httpServerDoer, rep, witStart, receiptEnd, *oobiery.doers]) return doers
[docs] def createHttpServer(host, port, app, keypath=None, certpath=None, cafilepath=None): """ Create an HTTP or HTTPS server depending on whether TLS key material is present Parameters: host(str) : host to bind to for this server, or None for default of '0.0.0.0', all ifaces port (int) : port to listen on for all HTTP(s) server instances app (Any) : WSGI application instance to pass to the http.Server instance keypath (string) : the file path to the TLS private key certpath (string) : the file path to the TLS signed certificate (public key) cafilepath (string): the file path to the TLS CA certificate chain file Returns: hio.core.http.Server """ if keypath is not None and certpath is not None and cafilepath is not None: servant = tcp.ServerTls(certify=False, keypath=keypath, certpath=certpath, cafilepath=cafilepath, port=port) server = http.Server(host=host, port=port, app=app, servant=servant) else: server = http.Server(host=host, port=port, app=app) return server
[docs] class WitnessStart(doing.DoDoer): """ Doer to print witness prefix after initialization """ def __init__(self, hab, parser, kvy, tvy, rvy, exc, cues=None, replies=None, responses=None, queries=None, **opts): self.hab = hab self.parser = parser self.kvy = kvy self.tvy = tvy self.rvy = rvy self.exc = exc self.queries = queries if queries is not None else decking.Deck() self.replies = replies if replies is not None else decking.Deck() self.responses = responses if responses is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() doers = [doing.doify(self.start), doing.doify(self.msgDo), doing.doify(self.escrowDo), doing.doify(self.cueDo)] super().__init__(doers=doers, **opts)
[docs] def start(self, tymth=None, tock=0.0, **kwa): """ Prints witness name and prefix Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while not self.hab.inited: yield self.tock print("Witness", self.hab.name, ":", self.hab.pre)
[docs] def msgDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process incoming message stream of .kevery Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) if self.parser.ims: logger.debug("Client %s received:\n%s\n...\n", self.kvy, self.parser.ims[:1024]) done = yield from self.parser.parsator(local=True) # process messages continuously return done # should nover get here except forced close
[docs] def escrowDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery and .tevery escrows. Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: self.kvy.processEscrows() self.rvy.processEscrowReply() if self.tvy is not None: self.tvy.processEscrows() self.exc.processEscrow() yield
[docs] def cueDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery.cues deque Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.cues: cue = self.cues.pull() # self.cues.popleft() cueKin = cue["kin"] if cueKin == "stream": self.queries.append(cue) else: self.responses.append(cue) yield self.tock yield self.tock
[docs] class Indirector(doing.DoDoer): """ Base class for Indirect Mode KERI Controller Doer with habitat and TCP Clients for talking to witnesses Subclass of DoDoer with doers list from do generator methods: .msgDo, .cueDo, and .escrowDo. Enables continuous scheduling of doers (do generator instances or functions) Implements Doist like functionality to allow nested scheduling of doers. Each DoDoer runs a list of doers like a Doist but using the tyme from its injected tymist as injected by its parent DoDoer or Doist. Scheduling hierarchy: Doist->DoDoer...->DoDoer->Doers Attributes: .done is Boolean completion state: True means completed Otherwise incomplete. Incompletion maybe due to close or abort. .opts is dict of injected options for its generator .do .doers is list of Doers or Doer like generator functions Attributes: hab (Habitat: local controller's context client (serving.Client): hio TCP client instance. Assumes operated by another doer. Properties: tyme (float): relative cycle time of associated Tymist, obtained via injected .tymth function wrapper closure. tymth (function): function wrapper closure returned by Tymist .tymeth() method. When .tymth is called it returns associated Tymist .tyme. .tymth provides injected dependency on Tymist tyme base. tock (float): desired time in seconds between runs or until next run, non negative, zero means run asap Methods: .wind injects ._tymth dependency from associated Tymist to get its .tyme .__call__ makes instance callable Appears as generator function that returns generator .do is generator method that returns generator .enter is enter context action method .recur is recur context action method or generator method .clean is clean context action method .exit is exit context method .close is close context method .abort is abort context method Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, hab, client, direct=True, doers=None, **kwa): """ Initialize instance. Inherited Parameters: tymist is Tymist instance tock is float seconds initial value of .tock doers is list of doers (do generator instances, functions or methods) Parameters: hab is Habitat instance of local controller's context client is TCP Client instance direct is Boolean, True means direwct mode process cured receipts False means indirect mode don't process cue'ed receipts """ self.hab = hab self.client = client # use client for both rx and tx self.direct = True if direct else False self.kevery = Kevery(db=self.hab.db, lax=False, local=False, cloned=not self.direct, direct=self.direct) self.parser = parsing.Parser(ims=self.client.rxbs, framed=True, kvy=self.kevery, version=Vrsn_1_0) doers = doers if doers is not None else [] doers.extend([doing.doify(self.msgDo), doing.doify(self.escrowDo)]) if self.direct: doers.extend([doing.doify(self.cueDo)]) super(Indirector, self).__init__(doers=doers, **kwa) if self.tymth: self.client.wind(self.tymth)
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(Indirector, self).wind(tymth) self.client.wind(tymth)
[docs] def msgDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process incoming message stream of .kevery Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) if self.parser.ims: logger.debug("Client %s received:\n%s\n...\n", self.hab.pre, self.parser.ims[:1024]) done = yield from self.parser.parsator(local=True) # process messages continuously return done # should nover get here except forced close
[docs] def cueDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery.cues deque Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: for msg in self.hab.processCuesIter(self.kevery.cues): self.sendMessage(msg, label="chit or receipt") yield # throttle just do one cue at a time yield
[docs] def escrowDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery escrows. Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: self.kevery.processEscrows() yield
[docs] def sendMessage(self, msg, label=""): """ Sends message msg and loggers label if any """ self.client.tx(msg) # send to remote logger.debug("%s sent %s:\n%s\n\n", self.hab.pre, label, bytes(msg))
[docs] class MailboxDirector(doing.DoDoer): """ Class for Indirect Mode KERI Controller Doer with habitat and TCP Clients for talking to witnesses Subclass of DoDoer with doers list from do generator methods: .msgDo, .cueDo, and .escrowDo. Enables continuous scheduling of doers (do generator instances or functions) Implements Doist like functionality to allow nested scheduling of doers. Each DoDoer runs a list of doers like a Doist but using the tyme from its injected tymist as injected by its parent DoDoer or Doist. Scheduling hierarchy: Doist->DoDoer...->DoDoer->Doers Attributes: .done is Boolean completion state: True means completed Otherwise incomplete. Incompletion maybe due to close or abort. .opts is dict of injected options for its generator .do .doers is list of Doers or Doer like generator functions Attributes: hby (Habitat: local controller's context Properties: hby (Habery): the Habery in which mailbox messages are routed verifier (Verifier): TEL event acceptor and validator exchanger (Exchanger): Exchange (exn) message delivery component rep (Respondant): Respondant for reply messages cues (Deck): Queue for new actions to schedule shared between the Revery, Kevery (and Kever), and Tevery (and Tever) Methods: .wind injects ._tymth dependency from associated Tymist to get its .tyme .__call__ makes instance callable Appears as generator function that returns generator .do is generator method that returns generator .enter is enter context action method .recur is recur context action method or generator method .clean is clean context action method .exit is exit context method .close is close context method .abort is abort context method Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, hby, topics, ims=None, verifier=None, kvy=None, exc=None, rep=None, cues=None, rvy=None, tvy=None, witnesses=True, **kwa): """ Initialize instance. Inherited Parameters: tymist is Tymist instance tock is float seconds initial value of .tock doers is list of doers (do generator instances, functions or methods) Parameters: hab is Habitat instance of local controller's context client is TCP Client instance direct is Boolean, True means direwct mode process cured receipts False means indirect mode don't process cue'ed receipts """ self.hby = hby self.verifier = verifier self.exchanger = exc self.rep = rep self.topics = topics self.pollers = list() self.prefixes = oset() self.cues = cues if cues is not None else decking.Deck() self.witnesses = witnesses self.ims = ims if ims is not None else bytearray() doers = [] doers.extend([doing.doify(self.pollDo), doing.doify(self.msgDo), doing.doify(self.escrowDo)]) self.rtr = routing.Router() self.rvy = rvy if rvy is not None else routing.Revery(db=self.hby.db, rtr=self.rtr, cues=cues, lax=True, local=False) # needs unique kevery with ims per remoter connnection self.kvy = kvy if kvy is not None else Kevery(db=self.hby.db, cues=self.cues, rvy=self.rvy, lax=True, local=False, direct=False) self.kvy.registerReplyRoutes(self.rtr) if self.verifier is not None: from ..vdr import Tevery # dynamic import because of circular import self.tvy = tvy if tvy is not None else Tevery(reger=self.verifier.reger, db=self.hby.db, rvy=self.rvy, lax=True, local=False, cues=self.cues) self.tvy.registerReplyRoutes(self.rtr) else: self.tvy = None self.parser = parsing.Parser(ims=self.ims, framed=True, kvy=self.kvy, tvy=self.tvy, exc=self.exchanger, rvy=self.rvy, vry=self.verifier, version=Vrsn_1_0) super(MailboxDirector, self).__init__(doers=doers, **kwa)
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(MailboxDirector, self).wind(tymth)
[docs] def pollDo(self, tymth=None, tock=0.0, **kwa): """ Returns: doifiable Doist compatible generator method Usage: add result of doify on this method to doers list """ # enter context self.wind(tymth) self.tock = tock _ = (yield self.tock) habs = list(self.hby.habs.values()) for hab in habs: if hab.accepted: self.addPollers(hab) _ = (yield self.tock) while True: pres = oset(self.hby.habs.keys()) if new := pres - self.prefixes: for pre in new: hab = self.hby.habs[pre] if hab.accepted: self.addPollers(hab=hab) _ = (yield self.tock) for msg in self.processPollIter(): self.ims.extend(msg) _ = (yield self.tock) _ = (yield self.tock)
[docs] def addPollers(self, hab): """ add mailbox pollers for every witness for this prefix identifier Parameters: hab (Hab): the Hab of the prefix """ for (_, erole, eid), end in hab.db.ends.getTopItemIter(keys=(hab.pre, Roles.mailbox)): if end.allowed: poller = Poller(hab=hab, topics=self.topics, witness=eid) self.pollers.append(poller) self.extend([poller]) if self.witnesses: wits = hab.kever.wits for wit in wits: poller = Poller(hab=hab, topics=self.topics, witness=wit) self.pollers.append(poller) self.extend([poller]) self.prefixes.add(hab.pre)
def addPoller(self, hab, witness): poller = Poller(hab=hab, topics=self.topics, witness=witness) self.pollers.append(poller) self.extend([poller])
[docs] def processPollIter(self): """ Iterate through cues and yields one or more responses for each cue. Parameters: cues is deque of cues """ mail = [] for poller in self.pollers: # get responses from all behaviors while poller.msgs: msg = poller.msgs.popleft() mail.append(msg) while mail: # iteratively process each response in responses msg = mail.pop(0) yield msg
[docs] def msgDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process incoming message stream of .kevery Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) done = yield from self.parser.parsator(local=True) # process messages continuously return done # should nover get here except forced close
[docs] def escrowDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery escrows. Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: self.kvy.processEscrows() self.rvy.processEscrowReply() if self.exchanger is not None: self.exchanger.processEscrow() if self.tvy is not None: self.tvy.processEscrows() if self.verifier is not None: self.verifier.processEscrows() yield
@property def times(self): times = dict() for poller in self.pollers: # get responses from all pollers times |= poller.times return times
[docs] class Poller(doing.DoDoer): """ Polls remote SSE endpoint for event that are KERI messages to be processed """
[docs] def __init__(self, hab, witness, topics, msgs=None, retry=1000, **kwa): """ Returns doist compatible doing.Doer that polls a witness for mailbox messages as SSE events Parameters: hab: witness: topics: msgs: """ self.hab = hab self.pre = hab.pre self.witness = witness self.topics = topics self.retry = retry self.msgs = None if msgs is not None else decking.Deck() self.times = dict() doers = [doing.doify(self.eventDo)] super(Poller, self).__init__(doers=doers, **kwa)
[docs] def eventDo(self, tymth=None, tock=0.0, **kwa): """ Returns: doifiable Doist compatible generator method Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) witrec = self.hab.db.tops.get((self.pre, self.witness)) if witrec is None: witrec = TopicsRecord(topics=dict()) while self.retry > 0: try: client, clientDoer = httpClient(self.hab, self.witness) except MissingEntryError as e: traceback.print_exception(e, file=sys.stderr) # logging yield self.tock continue self.extend([clientDoer]) topics = dict() q = dict(pre=self.pre, topics=topics) for topic in self.topics: if topic in witrec.topics: topics[topic] = witrec.topics[topic] + 1 else: topics[topic] = 0 if isinstance(self.hab, GroupHab): msg = self.hab.mhab.query(pre=self.pre, src=self.witness, route="mbx", query=q) else: msg = self.hab.query(pre=self.pre, src=self.witness, route="mbx", query=q) createCESRRequest(msg, client, dest=self.witness) while client.requests: yield self.tock created = nowUTC() while True: now = nowUTC() if now - created > datetime.timedelta(seconds=30): self.remove([clientDoer]) break while client.events: evt = client.events.popleft() if "retry" in evt: self.retry = evt["retry"] if "id" not in evt or "data" not in evt or "name" not in evt: logger.error(f"bad mailbox event: {evt}") continue idx = evt["id"] msg = evt["data"] tpc = evt["name"] if not idx or not msg or not tpc: logger.error(f"bad mailbox event: {evt}") continue self.msgs.append(msg.encode("utf=8")) yield self.tock witrec.topics[tpc] = int(idx) self.times[tpc] = nowUTC() self.hab.db.tops.pin((self.pre, self.witness), witrec) yield 0.25 yield self.retry / 1000
[docs] class HttpEnd: """ HTTP handler that accepts and KERI events POSTed as the body of a request with all attachments to the message as a CESR attachment HTTP header. KEL Messages are processed and added to the database of the provided Habitat. This also handles `req`, `exn` and `tel` messages that respond with a KEL replay. """ TimeoutQNF = 30 TimeoutMBX = 5
[docs] def __init__(self, rxbs=None, mbx=None, qrycues=None): """ Create the KEL HTTP server from the Habitat with an optional Falcon App to register the routes with. Parameters rxbs (bytearray): output queue of bytes for message processing mbx (Mailboxer): Mailbox storage qrycues (Deck): inbound qry response queues """ self.rxbs = rxbs if rxbs is not None else bytearray() self.mbx = mbx self.qrycues = qrycues if qrycues is not None else decking.Deck()
[docs] def on_post(self, req, rep): """ Handles POST for KERI event messages. Parameters: req (Request) Falcon HTTP request rep (Response) Falcon HTTP response .. code-block:: none --- summary: Accept KERI events with attachment headers and parse description: Accept KERI events with attachment headers and parse. tags: - Events requestBody: required: true content: application/json: schema: type: object description: KERI event message responses: 200: description: Mailbox query response for server sent events 204: description: KEL or EXN event accepted. """ if req.method == "OPTIONS": rep.status = falcon.HTTP_200 return rep.set_header('Cache-Control', "no-cache") rep.set_header('connection', "close") cr = parseCesrHttpRequest(req=req) sadder = coring.Sadder(ked=cr.payload, kind=Kinds.json) msg = bytearray(sadder.raw) msg.extend(cr.attachments.encode("utf-8")) self.rxbs.extend(msg) if sadder.proto in ("ACDC",): rep.set_header('Content-Type', "application/json") rep.status = falcon.HTTP_204 else: ilk = sadder.ked["t"] if ilk in (Ilks.icp, Ilks.rot, Ilks.ixn, Ilks.dip, Ilks.drt, Ilks.exn, Ilks.rpy): rep.set_header('Content-Type', "application/json") rep.status = falcon.HTTP_204 elif ilk in (Ilks.vcp, Ilks.vrt, Ilks.iss, Ilks.rev, Ilks.bis, Ilks.brv): rep.set_header('Content-Type', "application/json") rep.status = falcon.HTTP_204 elif ilk in (Ilks.qry,): if sadder.ked["r"] in ("mbx",): rep.set_header('Content-Type', "text/event-stream") rep.status = falcon.HTTP_200 rep.stream = QryRpyMailboxIterable(mbx=self.mbx, cues=self.qrycues, said=sadder.said) else: rep.set_header('Content-Type', "application/json") rep.status = falcon.HTTP_204
[docs] def on_put(self, req, rep): """ Handles PUT for KERI mbx event messages. Parameters: req (Request) Falcon HTTP request rep (Response) Falcon HTTP response .. code-block:: none --- summary: Accept KERI events with attachment headers and parse description: Accept KERI events with attachment headers and parse. tags: - Events requestBody: required: true content: application/json: schema: type: object description: KERI event message responses: 200: description: Mailbox query response for server sent events 204: description: KEL or EXN event accepted. """ if req.method == "OPTIONS": rep.status = falcon.HTTP_200 return rep.set_header('Cache-Control', "no-cache") rep.set_header('connection', "close") self.rxbs.extend(req.bounded_stream.read()) rep.set_header('Content-Type', "application/json") rep.status = falcon.HTTP_204
class QryRpyMailboxIterable: def __init__(self, cues, mbx, said, retry=5000): self.mbx = mbx self.retry = retry self.cues = cues self.said = said self.iter = None def __iter__(self): return self def __next__(self): if self.iter is None: if self.cues: cue = self.cues.pull() serder = cue["serder"] if serder.said == self.said: kin = cue["kin"] if kin == "stream": self.iter = iter(MailboxIterable(mbx=self.mbx, pre=cue["pre"], topics=cue["topics"], retry=self.retry)) else: self.cues.append(cue) return b'' return next(self.iter) class MailboxIterable: TimeoutMBX = 30000000 def __init__(self, mbx, pre, topics, retry=5000): self.mbx = mbx self.pre = pre self.topics = topics self.retry = retry def __iter__(self): self.start = self.end = time.perf_counter() return self def __next__(self): if self.end - self.start < self.TimeoutMBX: if self.start == self.end: self.end = time.perf_counter() return bytearray(f"retry: {self.retry}\n\n".encode("utf-8")) data = bytearray() for topic, idx in self.topics.items(): key = self.pre + topic for fn, _, msg in self.mbx.cloneTopicIter(key, idx): data.extend(bytearray("id: {}\nevent: {}\nretry: {}\ndata: ".format(fn, topic, self.retry) .encode("utf-8"))) data.extend(msg) data.extend(b'\n\n') idx = idx + 1 self.start = time.perf_counter() self.topics[topic] = idx self.end = time.perf_counter() return data raise StopIteration
[docs] class ReceiptEnd(doing.DoDoer): """ Endpoint class for Witnessing receipting functionality Most times a witness will be able to return its receipt for an event inband. This API will provide that functionality. When an event needs to be escrowed, this POST API will return a 202 and also provides a generic GET API for retrieving a receipt for any event. """ def __init__(self, hab, inbound=None, outbound=None, aids=None): self.hab = hab self.inbound = inbound if inbound is not None else decking.Deck() self.outbound = outbound if outbound is not None else decking.Deck() self.aids = aids self.receipts = set() self.psr = parsing.Parser(framed=True, kvy=self.hab.kvy, version=Vrsn_1_0) super(ReceiptEnd, self).__init__(doers=[doing.doify(self.interceptDo)])
[docs] def on_post(self, req, rep): """ Receipt POST endpoint handler Parameters: req (Request): Falcon HTTP request object rep (Response): Falcon HTTP response object """ if req.method == "OPTIONS": rep.status = falcon.HTTP_200 return rep.set_header('Cache-Control', "no-cache") rep.set_header('connection', "close") cr = parseCesrHttpRequest(req=req) serder = serdering.SerderKERI(sad=cr.payload, kind=Kinds.json) pre = serder.ked["i"] if self.aids is not None and pre not in self.aids: raise falcon.HTTPBadRequest(description=f"invalid AID={pre} for witnessing receipting") ilk = serder.ked["t"] if ilk not in (Ilks.icp, Ilks.rot, Ilks.ixn, Ilks.dip, Ilks.drt): raise falcon.HTTPBadRequest(description=f"invalid event type ({ilk})for receipting") msg = bytearray(serder.raw) msg.extend(cr.attachments.encode("utf-8")) self.psr.parseOne(ims=msg, local=True) if pre in self.hab.kevers: kever = self.hab.kevers[pre] wits = kever.wits if self.hab.pre not in wits: raise falcon.HTTPBadRequest(description=f"{self.hab.pre} is not a valid witness for {pre} event at " f"{serder.sn}: wits={wits}") rct = self.hab.receipt(serder, framed=True, version=serder.pvrsn, kind=serder.kind) self.psr.parseOne(bytes(rct)) rep.set_header('Content-Type', CESR_CONTENT_TYPE) rep.status = falcon.HTTP_200 rep.data = rct else: rep.status = falcon.HTTP_202
[docs] def on_get(self, req, rep): """ Receipt GET endpoint handler Parameters: req (Request): Falcon HTTP request object rep (Response): Falcon HTTP response object """ pre = req.get_param("pre") sn = req.get_param_as_int("sn") said = req.get_param("said") if pre is None: raise falcon.HTTPBadRequest(description="query param 'pre' is required") preb = pre.encode("utf-8") if sn is None and said is None: raise falcon.HTTPBadRequest(description="either 'sn' or 'said' query param is required") if sn is not None: said = self.hab.db.kels.getLast(keys=preb, on=sn) if said is None: raise falcon.HTTPNotFound(description=f"event for {pre} at {sn} ({said}) not found") said = said.encode("utf-8") if not (serder := self.hab.db.evts.get(keys=(preb, said))): raise falcon.HTTPNotFound(description="Missing event for dig={}.".format(said)) if serder.sn > 0: wits = [wit.qb64 for wit in self.hab.kvy.fetchWitnessState(pre, serder.sn)] else: wits = serder.ked["b"] if self.hab.pre not in wits: raise falcon.HTTPBadRequest(description=f"{self.hab.pre} is not a valid witness for {pre} event at " f"{serder.sn}, {wits}") rserder = receipt(pre=pre, sn=sn, said=said.decode("utf-8"), version=serder.pvrsn, kind=serder.kind) rct = bytearray(rserder.raw) if wigers := self.hab.db.wigs.get(keys=(preb, said)): rct.extend(Counter(Codens.WitnessIdxSigs, count=len(wigers), version=serder.pvrsn).qb64b) for wiger in wigers: rct.extend(wiger.qb64b) rep.set_header('Content-Type', CESR_CONTENT_TYPE) rep.status = falcon.HTTP_200 rep.data = rct
[docs] def interceptDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process Kevery and Tevery cues deque Usage: add result of doify on this method to doers list """ # enter context self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.inbound: # iteratively process each cue in cues cue = self.inbound.popleft() cueKin = cue["kin"] # type or kind of cue if cueKin in ("receipt",): # cue to receipt a received event from other pre serder = cue["serder"] # Serder of received event for other pre if serder.saidb in self.receipts: self.receipts.remove(serder.saidb) else: self.outbound.append(cue) else: self.outbound.append(cue) yield self.tock yield self.tock
[docs] class QueryEnd: """ Endpoint class for quering witness for KELs and TELs using HTTP GET """ def __init__(self, hab, reger): self.hab = hab self.reger = reger
[docs] def on_get(self, req, rep): """ Handles GET requests to query KEL or TEL events of a pre from a witness. Parameters: req (Request) Falcon HTTP request rep (Response) Falcon HTTP response Query Parameters: typ (string): The type of event data to query for. Accepted values are: - 'kel': Retrieve KEL events for a specified 'pre'. - 'tel': Retrieve TEL events based on 'reg' or 'vcid'. pre (string, optional): For 'kel' queries, the specific 'pre' to query. sn (int, optional): For "kel" queries. If provided, returns events with seq-num >= sn. reg (string, optional): For 'tel' queries, registry pre. Required if vcid not provided. vcid (string, optional): For 'tel' queries, credential said. Required if reg not provided. Response: - 200 OK: Returns event data in "application/cesr" format. - 400 Bad Request: Returned if required query parameters are missing or if an invalid `typ` is specified. Example: - /query?typ=kel&pre=ELZ1KBCFOmdj1RPu6kMUnzgMBTl4YsHfpw7wIGvLgW5W - /query?typ=kel&pre=ELZ1KBCFOmdj1RPu6kMUnzgMBTl4YsHfpw7wIGvLgW5W&sn=5 - /query?typ=tel&reg=EHrbPfpRLU9wpFXTzGY-LIo2FjMiljjEnt238eWHb7yZ&vcid=EO5y0jMXS5XKTYBKjCUPmNKPr1FWcWhtKwB2Go2ozvr0 """ typ = req.get_param("typ") if not typ: raise falcon.HTTPBadRequest(description="'typ' query param is required") if typ == "kel": pre = req.get_param("pre") if not pre: raise falcon.HTTPBadRequest(description="'pre' query param is required") evnts = bytearray() sn = req.get_param_as_int("sn") if sn is not None: ## query for event with seq-num >= sn dig = self.hab.db.kels.getLast(keys=pre, on=sn) if dig is None: raise falcon.HTTPBadRequest(description=f"non-existant event at seq-num {sn}") for dig in self.hab.db.kels.getAllIter(keys=pre, on=sn): try: dig = dig.encode("utf-8") msg = self.hab.db.cloneEvtMsg(pre=pre, fn=0, dig=dig) except Exception: continue # skip this event evnts.extend(msg) else: for msg in self.hab.db.clonePreIter(pre=pre): evnts.extend(msg) rep.set_header('Content-Type', CESR_CONTENT_TYPE) rep.status = falcon.HTTP_200 rep.data = bytes(evnts) elif typ == "tel": regk = req.get_param("reg") vcid = req.get_param("vcid") if not regk and not vcid: raise falcon.HTTPBadRequest(description="Either 'reg' or 'vcid' query param is required for TEL query") evnts = bytearray() if regk is not None: cloner = self.reger.clonePreIter(pre=regk) for msg in cloner: evnts.extend(msg) if vcid is not None: cloner = self.reger.clonePreIter(pre=vcid) for msg in cloner: evnts.extend(msg) rep.set_header('Content-Type', CESR_CONTENT_TYPE) rep.status = falcon.HTTP_200 rep.data = bytes(evnts) else: rep.set_header('Content-Type', "application/json") rep.text = "unkown query type." rep.status = falcon.HTTP_400