Source code for keri.app.agenting

# -*- encoding: utf-8 -*-
"""
KERI
keri.app.agenting module

"""
import random
from urllib.parse import urlparse, urljoin

from hio.base import doing
from hio.core import http
from hio.core.tcp import clienting
from hio.help import decking, Hict, ogler

from socket import gaierror

from .httping import Clienter, streamCESRRequests, CESR_DESTINATION_HEADER

from ..kering import (Schemes, Roles, Vrsn_1_0,
                      MissingEntryError, ConfigurationError,
                      MissingEntryError)
from ..core import Counter, eventing, parsing, coring, serdering, Codens


logger = ogler.getLogger()


[docs] class Receiptor(doing.DoDoer): """ Receiptor is a parent task orchestrating both initial receipt retrieval of KEL events and subsequent retrieval of receipts for specific events based on queries. """
[docs] def __init__(self, hby, msgs=None, gets=None, cues=None): """ Initialize the Receiptor and create doers for processing and retrieving witness receipts. Parameters: hby (Habery): Habery to pull identifiers and witnesses for those identifiers from msgs (Deck): KEL event messages to publish to witnesses for receipting Messages should have {"pre": <str>, "sn": <int>, "auths": <dict>} gets (Deck): query messages of KEL events to retrieve receipts from witnesses for Messages should have {"pre": <str>, "sn": <int>} cues (Deck): outgoing cues of successful messages; currently the messages placed here are not used """ self.msgs = msgs if msgs is not None else decking.Deck() self.gets = gets if gets is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() self.clienter = Clienter() doers = [self.clienter, doing.doify(self.witDo), doing.doify(self.gitDo)] self.hby = hby super(Receiptor, self).__init__(doers=doers)
[docs] def receipt(self, pre, sn=None, auths=None): """Returns a generator performing witness receipting of KEL events. The returns a generator that will submit the designated event to witnesses for receipts using the synchronous witness API, then propagate the receipts to each of the other witnesses. Delegates to .catchup to catch up any new witnesses to the current state of the KEL. Parameters: pre (str): qualified base64 identifier to gather receipts for sn: (Optiona[int]): sequence number of event to gather receipts for, latest is used if not provided auths: (Options[dict]): map of witness AIDs to (time,auth) tuples for providing TOTP auth for witnessing Returns: list: identifiers of witnesses that returned receipts. """ auths = auths if auths is not None else dict() if pre not in self.hby.prefixes: raise MissingEntryError(f"{pre} not a valid AID") hab = self.hby.habs[pre] sn = sn if sn is not None else hab.kever.sner.num wits = hab.kever.wits if len(wits) == 0: return msg = hab.msgOwnEvent(sn=sn, framed=True) ser = serdering.SerderKERI(raw=msg) # If we are a rotation event, may need to catch new witnesses up to current key state if ser.ked['t'] in (coring.Ilks.rot,): adds = ser.ked["ba"] for wit in adds: yield from self.catchup(ser.pre, wit) clients = dict() doers = [] for wit in wits: try: client, clientDoer = httpClient(hab, wit) clients[wit] = client doers.append(clientDoer) self.extend([clientDoer]) except (MissingEntryError, gaierror) as e: logger.error(f"unable to create http client for witness {wit}: {e}") # send to each witness and gather receipts rcts = dict() for wit, client in clients.items(): headers = dict() if wit in auths: headers["Authorization"] = auths[wit] streamCESRRequests(client=client, dest=wit, ims=bytearray(msg), path="/receipts", headers=headers) while not client.responses: yield self.tock rep = client.respond() if rep.status == 200: rct = bytearray(rep.body) hab.psr.parseOne(bytearray(rct)) rserder = serdering.SerderKERI(raw=rct) del rct[:rserder.size] # pull off the count code Counter(qb64b=rct, strip=True, version=Vrsn_1_0) rcts[wit] = rct else: print(f"invalid response {rep.status} from witnesses {wit}") # send retrieved receipts to all other witnesses for wit in rcts: ewits = [w for w in rcts if w != wit] # get complement of all other witnesses wigers = [rcts[w] for w in ewits] # all other witness signatures msg = bytearray() if ser.ked['t'] in (coring.Ilks.icp, coring.Ilks.dip): # introduce new witnesses msg.extend(schemes(self.hby.db, eids=ewits)) elif ser.ked['t'] in (coring.Ilks.rot, coring.Ilks.drt) and \ ("ba" in ser.ked and wit in ser.ked["ba"]): # Newly added witness, introduce to all msg.extend(schemes(self.hby.db, eids=ewits)) rserder = eventing.receipt(pre=hab.pre, sn=sn, said=ser.said, version=ser.pvrsn, kind=ser.kind) msg.extend(rserder.raw) msg.extend(Counter(Codens.NonTransReceiptCouples, count=len(wigers), version=Vrsn_1_0).qb64b) for wiger in wigers: msg.extend(wiger) client = clients[wit] sent = streamCESRRequests(client=client, dest=wit, ims=bytearray(msg)) while len(client.responses) < sent: yield self.tock self.remove(doers) return rcts.keys()
[docs] def get(self, pre, sn=None): """ Queries a random witness for the receipt of the event at the sequence number for a prefix. Returns: a generator requesting receipts for event identified by pre and sn Parameters: pre (str): qualified base64 identifier to gather a receipt for sn: (Optiona[int]): sequence number of event to gather receipts for, latest is used if not provided """ if pre not in self.hby.prefixes: raise MissingEntryError(f"{pre} not a valid AID") hab = self.hby.habs[pre] sn = sn if sn is not None else hab.kever.sner.num wits = hab.kever.wits if len(wits) == 0: return wit = random.choice(hab.kever.wits) urls = hab.fetchUrls(eid=wit, scheme=Schemes.https) or hab.fetchUrls(eid=wit, scheme=Schemes.http) if not urls: raise MissingEntryError(f"unable to query witness {wit}, no http endpoint") base = urls[Schemes.https] if Schemes.https in urls else urls[Schemes.http] url = urljoin(base, f"/receipts?pre={pre}&sn={sn}") client = self.clienter.request("GET", url) while not client.responses: yield self.tock rep = client.respond() if rep.status == 200: rct = bytearray(rep.body) hab.psr.parseOne(bytearray(rct)) self.clienter.remove(client) return rep.status == 200
[docs] def catchup(self, pre, wit): """ A generator that catches up a target witness with the latest KEL state of a prefix. When adding a new Witness use this method to catch the witness up to the current state of the KEL Returns: a generator that sends the KEL to the witness Parameters: pre (str): qualified base64 AID of the KEL to send wit (str): qualified base64 AID of the witness to send the KEL to """ if pre not in self.hby.prefixes: raise MissingEntryError(f"{pre} not a valid AID") hab = self.hby.habs[pre] client, clientDoer = httpClient(hab, wit) self.extend([clientDoer]) for fmsg in hab.db.clonePreIter(pre=pre): streamCESRRequests(client=client, dest=wit, ims=bytearray(fmsg)) while not client.responses: yield self.tock self.remove([clientDoer])
[docs] def witDo(self, tymth=None, tock=0.0, **kwa): """ A generator that obtains witness receipts for one key event message at a tyme by processing all messages in the .msgs buffer and adds processed messages to cues. Catches up any new witnesses in "adds" to the current state of the KEL using self.catchup. Intended to be used with doify to create a generator Doer. Delegates to the internal receipt generator function. Returns: a Hio generator function to be used as a Doer. Parameters: tymth (function): function returning cycle time for configuring this Doer's cycle time. tock (float): cycle time for this Doer, default is 0.0 seconds. Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.msgs: msg = self.msgs.popleft() pre = msg["pre"] sn = msg["sn"] if "sn" in msg else None auths = msg["auths"] if "auths" in msg else None yield from self.receipt(pre, sn, auths) self.cues.push(msg) yield self.tock
[docs] def gitDo(self, tymth=None, tock=0.0, **kwa): """ A generator that obtains a witness receipts for key event messages specified in the query messages in the .gets buffer Intended to be used with doify to create a generator Doer. Returns: a Hio generator function to be used as a Doer. Parameters: tymth (function): function returning cycle time for configuring this Doer's cycle time. tock (float): cycle time for this Doer, default is 0.0 seconds. Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.gets: msg = self.gets.popleft() pre = msg["pre"] sn = msg["sn"] if "sn" in msg else None yield from self.get(pre, sn) yield self.tock
[docs] class WitnessReceiptor(doing.DoDoer): """ Sends receipt messages to all current witnesses of given identifier (from hab), waits for receipts from each of those witnesses, and propagates those receipts to each of the other witnesses after receiving the complete set. Removes all Doers and exits as Done once all witnesses have been sent the entire receipt set. Could be enhanced to have a `once` method that runs once and cleans up and an `all` method that runs and waits for more messages to receipt. """
[docs] def __init__(self, hby, msgs=None, cues=None, force=False, auths=None, **kwa): """ For the current event, gather the current set of witnesses, send the event, gather all receipts and send them to all other witnesses Parameters: hby (Habery): Habitat of the identifier to receipt witnesses msgs (Deck): events to send the event and receipt to all witnesses Messages should have {"pre": <str>, "sn": <int>, "auths": <dict>} cues (Deck): outgoing cues of events confirmed as fully receipted Messages have {"pre": <str>, "sn": <int>, "auths": <dict>} force (bool): True means to send witnesses all receipts even if we have a full complement. auths (dict): map of witness AIDs to (time,auth) tuples for providing TOTP auth for witnessing """ self.hby = hby self.force = force self.msgs = msgs if msgs is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() self.auths = auths if auths is not None else dict() super(WitnessReceiptor, self).__init__(doers=[doing.doify(self.receiptDo)], **kwa)
[docs] def receiptDo(self, tymth=None, tock=0.0, **kwa): """ Sends events, their receipts, receipt signatures, delegation chain, and location record URLs between witnesses in the set of current witnesses. Returns: a doifiable Hio generator to perform event and receipt sending. Usage: add result of doify on this method to doers list Parameters: tymth (function): function returning cycle time for configuring this Doer's cycle time. tock (float): cycle time for this Doer, default is 0.0 seconds. """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.msgs: evt = self.msgs.popleft() pre = evt["pre"] if pre not in self.hby.habs: continue hab = self.hby.habs[pre] sn = evt["sn"] if "sn" in evt else hab.kever.sner.num wits = hab.kever.wits if len(wits) == 0: continue msg = hab.msgOwnEvent(sn=sn, framed=True) ser = serdering.SerderKERI(raw=msg) witers = [] for wit in wits: auth = self.auths[wit] if wit in self.auths else None witer = messenger(hab, wit, auth=auth) witers.append(witer) self.extend([witer]) # Check to see if we already have all the receipts we need for this event wigers = hab.db.wigs.get(keys=(ser.preb, ser.saidb)) completed = len(wigers) == len(wits) if len(wigers) != len(wits): # We have all the receipts, skip for idx, witer in enumerate(witers): wit = wits[idx] for dmsg in hab.db.cloneDelegation(hab.kever): witer.msgs.append(bytearray(dmsg)) if ser.ked['t'] in (coring.Ilks.icp, coring.Ilks.dip) or \ "ba" in ser.ked and wit in ser.ked["ba"]: # Newly added witness, must send full KEL to catch up for fmsg in hab.db.clonePreIter(pre=pre): witer.msgs.append(bytearray(fmsg)) witer.msgs.append(bytearray(msg)) # make a copy _ = (yield self.tock) while True: wigers = hab.db.wigs.get(keys=(ser.preb, ser.saidb)) if len(wigers) == len(wits): break _ = yield self.tock # If we started with all our receipts, exit unless told to force resubmit of all receipts if completed and not self.force: self.cues.push(evt) continue # generate all rct msgs to send to all witnesses awigers = wigers # make sure all witnesses have fully receipted KERL and know about each other for witer in witers: ewits = [] wigers = [] for i, wit in enumerate(wits): if wit == witer.wit: continue ewits.append(wit) wigers.append(awigers[i]) if len(wigers) == 0: continue rctMsg = bytearray() # Now that the witnesses have not met each other, send them each other's receipts if ser.ked['t'] in (coring.Ilks.icp, coring.Ilks.dip): # introduce new witnesses rctMsg.extend(schemes(self.hby.db, eids=ewits)) elif ser.ked['t'] in (coring.Ilks.rot, coring.Ilks.drt) and \ ("ba" in ser.ked and witer.wit in ser.ked["ba"]): # Newly added witness, introduce to all rctMsg.extend(schemes(self.hby.db, eids=ewits)) rserder = eventing.receipt(pre=ser.pre, sn=sn, said=ser.said, version=ser.pvrsn, kind=ser.kind) rctMsg.extend(eventing.messagize(serder=rserder, wigers=wigers, framed=True)) witer.msgs.append(rctMsg) _ = (yield self.tock) while True: done = True for witer in witers: if not witer.idle: yield 1.0 done = False break if done: break self.remove(witers) self.cues.push(evt) yield self.tock yield self.tock
[docs] class WitnessInquisitor(doing.DoDoer): """ Sends KEL and TEL query messages to different types of targets whether witnesses, controllers, or agents based on the locally available endpoint role records for the query target. Queries are performed by sending query messages either to a random witness or to the controller or agent that is the target of the query. Removes all Doers and exits as Done once the query target has been sent the query message TODO: possibly rename based on the fact that multiple types of targets are supported (controller, agent, witness) """
[docs] def __init__(self, hby, msgs=None, klas=None, **kwa): """ Initialize the WitnessInquisitor with the given parameters. Parameters: hby (Habery): Habery context to use to retrieve the source Hab for reading endpoint role records klas (class): Type of messenger to use to send messages; defaults to HTTPMessenger; currently unused msgs (decking.Deck): query message buffer to be sent to the target or a random witness Attributes: hby (Habery): Habery context to use to retrieve the source Hab for reading endpoint role records klas (class): Type of messenger to use to send messages; defaults to HTTPMessenger; currently unused msgs (decking.Deck): query message buffer to be sent to the target or a random witness sent (decking.Deck): buffer for sent messages to track sent queries """ self.hby = hby self.klas = klas if klas is not None else HTTPMessenger self.msgs = msgs if msgs is not None else decking.Deck() self.sent = decking.Deck() super(WitnessInquisitor, self).__init__(doers=[doing.doify(self.msgDo)], **kwa)
[docs] def msgDo(self, tymth=None, tock=1.0, **opts): """ Signs and sends provided KEL and TEL query messages to query targets using HTTPMessenger or TCPMessenger. Uses a randomly selected witness if the query target is a witness, or uses the specified controller or agent when a controller or agent is specified. Returns a Hio generator function that runs until all messages in .msgs are processed. Usage: add result of doify on this method to doers list """ from .forwarding import introduce self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while not self.msgs: yield self.tock evt = self.msgs.popleft() pre = evt["pre"] target = evt["target"] src = evt["src"] r = evt["r"] q = evt["q"] wits = evt["wits"] if "wits" in evt else None if "hab" in evt: hab = evt["hab"] elif (hab := self.hby.habByPre(src)) is None: continue if not wits and pre not in self.hby.kevers: logger.error(f"must have KEL for identifier to query {pre}") continue if not wits: ends = hab.endsFor(pre=pre) if Roles.controller in ends: end = ends[Roles.controller] elif Roles.agent in ends: end = ends[Roles.agent] elif Roles.witness in ends: end = ends[Roles.witness] else: logger.error(f"unable query: can not find a valid role for {pre}") continue if len(end.items()) == 0: logger.error(f"must have endpoint to query for pre={pre}") continue ctrl, locs = random.choice(list(end.items())) if len(locs.items()) == 0: logger.error(f"must have location in endpoint to query for pre={pre}") continue witer = messengerFrom(hab=hab, pre=ctrl, urls=locs) else: wit = random.choice(wits) witer = messenger(hab, wit) self.extend([witer]) msg = hab.query(target, src=witer.wit, route=r, query=q) # Query for remote pre Event kel = introduce(hab, witer.wit) if kel: witer.msgs.append(bytearray(kel)) witer.msgs.append(bytearray(msg)) while not witer.sent: yield self.tock self.sent.append(witer.sent.popleft()) yield self.tock
[docs] def query(self, pre, r="logs", sn='0', fn='0', src=None, hab=None, anchor=None, wits=None, **kwa): """ Create a KEL query (`qry`) message against the attester for the prefix (`pre`) and place on the internal .msgs queue for processing by the .msgDo doer. May also contain an anchor to use to locate a key event. May specify the hab to use for signing and retrieving endpoint role records. Parameters: pre (str): qb64 identifier prefix being queried for r (str): query route sn (str): optional specific hex str of sequence number to query for fn (str): optional specific hex str of sequence number to start with src (str): qb64 identifier prefix of source of query hab (Hab): Hab to use instead of src, if provided, to retrieve endpoint role records from and to perform signing anchor (Seal): anchored Seal to search for in the query target wits (list) witnesses to query """ qry = dict(s=sn, fn=fn) if anchor is not None: qry["a"] = anchor msg = dict(src=src, pre=pre, target=pre, r=r, q=qry, wits=wits) if hab is not None: msg["hab"] = hab self.msgs.append(msg)
[docs] def telquery(self, ri, src=None, i=None, r="tels", hab=None, pre=None, wits=None, **kwa): """ Create a TEL Query message to search against a given registry, issuer, route, in the target prefixe's (`pre`) records, and add that query message on the internal .msgs queue. May specify the hab to use for signing and retrieving endpoint role records. Parameters: ri (str): qb64 identifier prefix of the registry being queried src (str): qb64 identifier prefix of source of query i (str): qb64 identifier prefix of the issuer of the registry being queried r (str): query route hab (Hab): Hab to use instead of src, if provided, to retrieve endpoint role records from and to perform signing pre (str): qb64 identifier prefix of the target being queried wits (list): witnesses to query """ qry = dict(ri=ri) msg = dict(src=src, pre=pre, target=i, r=r, wits=wits, q=qry) if hab is not None: msg["hab"] = hab self.msgs.append(msg)
[docs] class WitnessPublisher(doing.DoDoer): """ Sends messages to all current witnesses of given identifier (from hab) and exits. Removes all Doers and exits as Done once all witnesses have been sent the message. Could be enhanced to have a `once` method that runs once and cleans up and an `all` method that runs and waits for more messages to receipt. """
[docs] def __init__(self, hby, msgs=None, cues=None, **kwa): """Initialize with publish queue (msgs) and completion cues. Parameters: hby (Habery): Habitat of the identifier to populate witnesses msgs (Deck): incoming messages to publish to witnesses cues (Deck): outgoing cues of successful messages """ self.hby = hby self.posted = 0 self.msgs = msgs if msgs is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() super(WitnessPublisher, self).__init__(doers=[doing.doify(self.sendDo)], **kwa)
[docs] def sendDo(self, tymth=None, tock=0.0, **opts): """Doer loop that sends queued messages to each witness. Pushes the original request to self.cues to signal completion """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.msgs: evt = self.msgs.popleft() self.posted += 1 pre = evt["pre"] msg = evt["msg"] if pre not in self.hby.habs: continue hab = self.hby.habs[pre] wits = hab.kever.wits witers = [] for wit in wits: witer = messenger(hab, wit) witers.append(witer) witer.msgs.append(bytearray(msg)) # make a copy so everyone munges their own self.extend([witer]) _ = (yield self.tock) while witers: witer = witers.pop() while not witer.idle: _ = (yield self.tock) self.remove(witers) self.cues.push(evt) yield self.tock yield self.tock
[docs] def sent(self, said): """ Check if message with given SAID was sent Parameters: said (str): qb64 SAID of message to check for """ for cue in self.cues: if cue["said"] == said: return True return False
@property def idle(self): return len(self.msgs) == 0 and self.posted == len(self.cues)
[docs] class TCPMessenger(doing.DoDoer): """Send outbound CESR messages to a witness via TCP and parse inbound receipts."""
[docs] def __init__(self, hab, wit, url, msgs=None, sent=None, doers=None, **kwa): """Initialize TCP messenger with queues and parser wiring. Parameters: hab (Hab): habitat for KEL parsing and db access. wit (str): qb64 witness identifier. url (str): tcp endpoint URL for the witness. msgs (Deck | None): outbound message queue. sent (Deck | None): sent message queue. """ self.hab = hab self.wit = wit self.url = url self.posted = 0 self.msgs = msgs if msgs is not None else decking.Deck() self.sent = sent if sent is not None else decking.Deck() self.parser = None doers = doers if doers is not None else [] doers.extend([doing.doify(self.receiptDo)]) self.kevery = eventing.Kevery(db=self.hab.db, **kwa) super(TCPMessenger, self).__init__(doers=doers)
[docs] def receiptDo(self, tymth=None, tock=0.0, **kwa): """Doer loop that sends queued messages over TCP.""" self.wind(tymth) self.tock = tock _ = (yield self.tock) up = urlparse(self.url) if up.scheme != Schemes.tcp: raise ValueError(f"invalid scheme {up.scheme} for TcpWitnesser") client = clienting.Client(host=up.hostname, port=up.port) self.parser = parsing.Parser(ims=client.rxbs, framed=True, kvy=self.kevery, version=Vrsn_1_0) clientDoer = clienting.ClientDoer(client=client) self.extend([clientDoer, doing.doify(self.msgDo)]) while True: while not self.msgs: yield self.tock msg = self.msgs.popleft() self.posted += 1 client.tx(msg) # send to connected remote while client.txbs: yield self.tock self.sent.append(msg) yield self.tock
[docs] def msgDo(self, tymth=None, tock=0.0, **opts): """Doer loop that parses inbound TCP messages into the Kevery.""" yield from self.parser.parsator(local=True) # process messages continuously
@property def idle(self): return len(self.sent) == self.posted
[docs] class TCPStreamMessenger(doing.DoDoer): """Stream a CESR message to a witness via TCP and parse inbound receipts."""
[docs] def __init__(self, hab, wit, url, msgs=None, sent=None, doers=None, **kwa): """Initialize TCP stream messenger with queues and parser wiring. Parameters: hab (Hab): habitat for KEL parsing and db access. wit (str): qb64 witness identifier. url (str): tcp endpoint URL for the witness. msgs (Deck | None): outbound message queue. sent (Deck | None): sent message queue. """ self.hab = hab self.wit = wit self.url = url self.posted = 0 self.msgs = msgs if msgs is not None else decking.Deck() self.sent = sent if sent is not None else decking.Deck() self.parser = None doers = doers if doers is not None else [] doers.extend([doing.doify(self.receiptDo)]) self.kevery = eventing.Kevery(db=self.hab.db, **kwa) super(TCPStreamMessenger, self).__init__(doers=doers)
[docs] def receiptDo(self, tymth=None, tock=0.0, **kwa): """Doer loop that sends queued messages over TCP. Pushes the original request to self.sent to signal completion """ self.wind(tymth) self.tock = tock _ = (yield self.tock) up = urlparse(self.url) if up.scheme != Schemes.tcp: raise ValueError(f"invalid scheme {up.scheme} for TcpWitnesser") client = clienting.Client(host=up.hostname, port=up.port) self.parser = parsing.Parser(ims=client.rxbs, framed=True, kvy=self.kevery, version=Vrsn_1_0) clientDoer = clienting.ClientDoer(client=client) self.extend([clientDoer, doing.doify(self.msgDo)]) while True: while not self.msgs: yield self.tock msg = self.msgs.popleft() self.posted += 1 client.tx(msg) # send to connected remote while client.txbs: yield self.tock self.sent.append(msg) yield self.tock
[docs] def msgDo(self, tymth=None, tock=0.0, **opts): """Doer loop that parses inbound TCP messages into the Kevery.""" yield from self.parser.parsator(local=True) # process messages continuously
@property def idle(self): return len(self.sent) == self.posted
[docs] class HTTPMessenger(doing.DoDoer): """Send CESR messages to a witness over HTTP and capture responses."""
[docs] def __init__(self, hab, wit, url, msgs=None, sent=None, doers=None, auth=None, **kwa): """Initialize HTTP messenger with queues and optional auth. Parameters: hab (Hab): habitat for KEL parsing and db access. wit (str): qb64 witness identifier. url (str): http/https endpoint URL for the witness. msgs (Deck | None): outbound message queue. sent (Deck | None): response queue. auth (str | None): optional 2FA auth codes for witnesses. """ self.hab = hab self.wit = wit self.posted = 0 self.msgs = msgs if msgs is not None else decking.Deck() self.sent = sent if sent is not None else decking.Deck() self.parser = None self.auth = auth doers = doers if doers is not None else [] doers.extend([doing.doify(self.msgDo), doing.doify(self.responseDo)]) up = urlparse(url) if up.scheme != Schemes.http and up.scheme != Schemes.https: raise ValueError(f"invalid scheme {up.scheme} for HTTPMessenger") self.client = http.clienting.Client(scheme=up.scheme, hostname=up.hostname, port=up.port) clientDoer = http.clienting.ClientDoer(client=self.client) doers.extend([clientDoer]) super(HTTPMessenger, self).__init__(doers=doers, **kwa)
[docs] def msgDo(self, tymth=None, tock=0.0, **kwa): """Doer loop that sends queued messages over HTTP.""" self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while not self.msgs: yield self.tock msg = self.msgs.popleft() headers = dict() if self.auth is not None: headers["Authorization"] = self.auth self.posted += streamCESRRequests(client=self.client, dest=self.wit, ims=msg, headers=headers) while self.client.requests: yield self.tock yield self.tock
[docs] def responseDo(self, tymth=None, tock=0.0, **kwa): """Doer loop that processes HTTP responses from the client and adds them into `sent` cues.""" self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.client.responses: rep = self.client.respond() self.sent.append(rep) yield yield
@property def idle(self): return len(self.msgs) == 0 and self.posted == len(self.sent)
[docs] class HTTPStreamMessenger(doing.DoDoer): """Stream a single CESR message via HTTP PUT and capture the response."""
[docs] def __init__(self, hab, wit, url, msg=b'', headers=None, **kwa): """Initialize a single-request HTTP messenger and add the HTTP client Doer to this DoDoer. Parameters: hab (Hab): habitat for KEL parsing and db access. wit (str): qb64 witness identifier. url (str): http/https endpoint URL for the witness. msg (bytes): CESR message body to send. headers (dict | None): extra HTTP headers. """ self.hab = hab self.wit = wit self.rep = None headers = headers if headers is not None else {} up = urlparse(url) if up.scheme != Schemes.http and up.scheme != Schemes.https: raise ValueError(f"invalid scheme {up.scheme} for HTTPMessenger") self.client = http.clienting.Client(scheme=up.scheme, hostname=up.hostname, port=up.port) clientDoer = http.clienting.ClientDoer(client=self.client) headers = Hict([ ("Content-Type", "application/cesr"), ("Content-Length", len(msg)), (CESR_DESTINATION_HEADER, self.wit), ] + list(headers.items())) self.client.request( method="PUT", path="/", headers=headers, body=bytes(msg) ) doers = [clientDoer] super(HTTPStreamMessenger, self).__init__(doers=doers, **kwa)
[docs] def recur(self, tyme, deeds=None): """Poll for a response and stop once received.""" if self.client.responses: self.rep = self.client.respond() self.remove([self.client]) return True return super(HTTPStreamMessenger, self).recur(tyme, deeds)
[docs] def mailbox(hab, cid): """ Finds and returns a mailbox, if any, based on the provided Mab and controller AID (cid). Returns: str | None: qb64 identifier prefix of the mailbox to use for the controller, or None if no mailbox found. Parameters: hab (Hab): Hab to use to look up witness URLs cid (str): qb64 identifier prefix of controller to find mailbox for """ for (_, erole, eid), end in hab.db.ends.getTopItemIter(keys=(cid, Roles.mailbox)): if end.allowed: return eid if cid not in hab.kevers: return None kever = hab.kevers[cid] if not kever.wits: return None mbx = random.choice(kever.wits) return mbx
[docs] def messenger(hab, pre, auth=None): """ Create a Messenger (tcp or http) based on available endpoints Parameters: hab (Habitat): Environment to use to look up witness URLs pre (str): qb64 identifier prefix of recipient to create a messanger for auth (str): optional auth code to send with any request for messenger Returns: Optional(TcpWitnesser, HTTPMessenger): witnesser for ensuring full reciepts """ urls = hab.fetchUrls(eid=pre) return messengerFrom(hab, pre, urls, auth)
[docs] def messengerFrom(hab, pre, urls, auth=None): """ Create a Witnesser (tcp or http) based on provided endpoints Parameters: hab (Habitat): Environment to use to look up witness URLs pre (str): qb64 identifier prefix of recipient to create a messanger for urls (dict): map of schemes to urls of available endpoints auth (str): optional auth code to send with any request for messenger Returns: Optional(TcpWitnesser, HTTPMessenger): witnesser for ensuring full reciepts """ if Schemes.http in urls or Schemes.https in urls: url = urls[Schemes.https] if Schemes.https in urls else urls[Schemes.http] witer = HTTPMessenger(hab=hab, wit=pre, url=url, auth=auth) elif Schemes.tcp in urls: url = urls[Schemes.tcp] witer = TCPMessenger(hab=hab, wit=pre, url=url) else: raise ConfigurationError(f"unable to find a valid endpoint for witness {pre}") return witer
[docs] def streamMessengerFrom(hab, pre, urls, msg, headers=None): """Create a stream messenger (HTTP or TCP) for a single outbound message. Parameters: hab (Habitat): Environment to use to look up witness URLs pre (str): qb64 identifier prefix of recipient to create a messanger for urls (dict): map of schemes to urls of available endpoints msg (bytes): bytes of message to send headers (dict): optional headers to send with HTTP requests Returns: Optional(TcpWitnesser, HTTPMessenger): witnesser for ensuring full reciepts """ if Schemes.http in urls or Schemes.https in urls: url = urls[Schemes.https] if Schemes.https in urls else urls[Schemes.http] witer = HTTPStreamMessenger(hab=hab, wit=pre, url=url, msg=msg, headers=headers) elif Schemes.tcp in urls: url = urls[Schemes.tcp] witer = TCPStreamMessenger(hab=hab, wit=pre, url=url) else: raise ConfigurationError(f"unable to find a valid endpoint for witness {pre}") return witer
[docs] def httpClient(hab, wit): """ Create and return a http.client and http.ClientDoer for the witness Parameters: hab (Habitat): Environment to use to look up witness URLs wit (str): qb64 identifier prefix of witness for which to create a client Returns: Client: Http client for connecting to remote identifier ClientDoer: Doer for client """ urls = hab.fetchUrls(eid=wit, scheme=Schemes.https) or hab.fetchUrls(eid=wit, scheme=Schemes.http) if not urls: raise MissingEntryError(f"unable to query witness {wit}, no http endpoint") url = urls[Schemes.https] if Schemes.https in urls else urls[Schemes.http] up = urlparse(url) client = http.clienting.Client(scheme=up.scheme, hostname=up.hostname, port=up.port, path=up.path) clientDoer = http.clienting.ClientDoer(client=client) return client, clientDoer
[docs] def schemes(db, eids): """ Creates a message bytearray of reply messages containing location records (URLs) and witness signatures, if any, for a given list of authorized endpoint role AIDs (eids). Returns: a bytearray of reply messages and their signatures Parameters: db (Baser): Hab database used to retrieve location records and witness signatures eids (list): list of endpoint role AIDs (eids) to retrieve location records and witness signatures for """ msgs = bytearray() for eid in eids: for scheme in Schemes: keys = (eid, scheme) said = db.lans.get(keys=keys) if said is not None: serder = db.rpys.get(keys=(said.qb64,)) cigars = db.scgs.get(keys=(said.qb64,)) if len(cigars) == 1: (verfer, cigar) = cigars[0] cigar.verfer = verfer else: cigar = None msgs.extend(eventing.messagize(serder=serder, cigars=[cigar], framed=False)) return msgs