Source code for keri.app.agenting

# -*- encoding: utf-8 -*-
"""
KERI
keri.app.agenting module

"""
import random
from urllib.parse import urlparse, urljoin

from hio.base import doing
from hio.core import http
from hio.core.tcp import clienting
from hio.help import decking, Hict

from . import httping, forwarding
from .. import help
from .. import kering
from ..core import eventing, parsing, coring, serdering
from ..core.coring import CtrDex
from ..db import dbing
from ..kering import Roles

logger = help.ogler.getLogger()


[docs] class Receiptor(doing.DoDoer): def __init__(self, hby, msgs=None, gets=None, cues=None): self.msgs = msgs if msgs is not None else decking.Deck() self.gets = gets if gets is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() self.clienter = httping.Clienter() doers = [self.clienter, doing.doify(self.witDo), doing.doify(self.gitDo)] self.hby = hby super(Receiptor, self).__init__(doers=doers)
[docs] def receipt(self, pre, sn=None): """ Returns a generator for witness receipting The returns a generator that will submit the designated event to witnesses for receipts using the synchronous witness API, the propogate the receipts to each of the other witnesses. Parameters: pre (str): qualified base64 identifier to gather receipts for sn: (Optiona[int]): sequence number of event to gather receipts for, latest is used if not provided Returns: list: identifiers of witnesses that returned receipts. """ if pre not in self.hby.prefixes: raise kering.MissingEntryError(f"{pre} not a valid AID") hab = self.hby.habs[pre] sn = sn if sn is not None else hab.kever.sner.num wits = hab.kever.wits if len(wits) == 0: return msg = hab.makeOwnEvent(sn=sn) ser = serdering.SerderKERI(raw=msg) # If we are a rotation event, may need to catch new witnesses up to current key state if ser.ked['t'] in (coring.Ilks.rot,): adds = ser.ked["ba"] for wit in adds: yield from self.catchup(ser.pre, wit) clients = dict() doers = [] for wit in wits: client, clientDoer = httpClient(hab, wit) clients[wit] = client doers.append(clientDoer) self.extend([clientDoer]) rcts = dict() for wit, client in clients.items(): httping.streamCESRRequests(client=client, dest=wit, ims=bytearray(msg), path="/receipts") while not client.responses: yield self.tock rep = client.respond() if rep.status == 200: rct = bytearray(rep.body) hab.psr.parseOne(bytearray(rct)) rserder = serdering.SerderKERI(raw=rct) del rct[:rserder.size] # pull off the count code coring.Counter(qb64b=rct, strip=True) rcts[wit] = rct else: logger.error(f"invalid response {rep.status} from witnesses {wit}") for wit in rcts.keys(): ewits = [w for w in rcts.keys() if w != wit] wigs = [sig for w, sig in rcts.items() if w != wit] msg = bytearray() if ser.ked['t'] in (coring.Ilks.icp, coring.Ilks.dip): # introduce new witnesses msg.extend(schemes(self.hby.db, eids=ewits)) elif ser.ked['t'] in (coring.Ilks.rot, coring.Ilks.drt) and \ ("ba" in ser.ked and wit in ser.ked["ba"]): # Newly added witness, introduce to all msg.extend(schemes(self.hby.db, eids=ewits)) rserder = eventing.receipt(pre=hab.pre, sn=sn, said=ser.said) msg.extend(rserder.raw) msg.extend(coring.Counter(code=CtrDex.NonTransReceiptCouples, count=len(wigs)).qb64b) for wig in wigs: msg.extend(wig) client = clients[wit] sent = httping.streamCESRRequests(client=client, dest=wit, ims=bytearray(msg)) while len(client.responses) < sent: yield self.tock self.remove(doers) return rcts.keys()
[docs] def get(self, pre, sn=None): """ Returns a generator for witness querying The returns a generator that will request receipts for event identified by pre and sn Parameters: pre (str): qualified base64 identifier to gather receipts for sn: (Optiona[int]): sequence number of event to gather receipts for, latest is used if not provided Returns: list: identifiers of witnesses that returned receipts. """ if pre not in self.hby.prefixes: raise kering.MissingEntryError(f"{pre} not a valid AID") hab = self.hby.habs[pre] sn = sn if sn is not None else hab.kever.sner.num wits = hab.kever.wits if len(wits) == 0: return wit = random.choice(hab.kever.wits) urls = hab.fetchUrls(eid=wit, scheme=kering.Schemes.http) or hab.fetchUrls(eid=wit, scheme=kering.Schemes.https) if not urls: raise kering.MissingEntryError(f"unable to query witness {wit}, no http endpoint") base = urls[kering.Schemes.http] if kering.Schemes.http in urls else urls[kering.Schemes.https] url = urljoin(base, f"/receipts?pre={pre}&sn={sn}") client = self.clienter.request("GET", url) while not client.responses: yield self.tock rep = client.respond() if rep.status == 200: rct = bytearray(rep.body) hab.psr.parseOne(bytearray(rct)) self.clienter.remove(client) return rep.status == 200
[docs] def catchup(self, pre, wit): """ When adding a new Witness, use this method to catch the witness up to the current state of the KEL Parameters: pre (str): qualified base64 AID of the KEL to send wit (str): qualified base64 AID of the witness to send the KEL to """ if pre not in self.hby.prefixes: raise kering.MissingEntryError(f"{pre} not a valid AID") hab = self.hby.habs[pre] client, clientDoer = httpClient(hab, wit) self.extend([clientDoer]) for fmsg in hab.db.clonePreIter(pre=pre): httping.streamCESRRequests(client=client, dest=wit, ims=bytearray(fmsg)) while not client.responses: yield self.tock self.remove([clientDoer])
[docs] def witDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery and .tevery escrows. Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.msgs: msg = self.msgs.popleft() pre = msg["pre"] sn = msg["sn"] if "sn" in msg else None yield from self.receipt(pre, sn) self.cues.push(msg) yield self.tock
[docs] def gitDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery and .tevery escrows. Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.gets: msg = self.gets.popleft() pre = msg["pre"] sn = msg["sn"] if "sn" in msg else None yield from self.get(pre, sn) yield self.tock
[docs] class WitnessReceiptor(doing.DoDoer): """ Sends messages to all current witnesses of given identifier (from hab) and waits for receipts from each of those witnesses and propagates those receipts to each of the other witnesses after receiving the complete set. Removes all Doers and exits as Done once all witnesses have been sent the entire receipt set. Could be enhanced to have a `once` method that runs once and cleans up and an `all` method that runs and waits for more messages to receipt. """
[docs] def __init__(self, hby, msgs=None, cues=None, force=False, **kwa): """ For the current event, gather the current set of witnesses, send the event, gather all receipts and send them to all other witnesses Parameters: hby (Habery): Habitat of the identifier to receipt witnesses msgs (Deck): incoming messages to publish to witnesses cues (Deck): outgoing cues of successful messages force (bool): True means to send witnesses all receipts even if we have a full compliment. """ self.hby = hby self.force = force self.msgs = msgs if msgs is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() super(WitnessReceiptor, self).__init__(doers=[doing.doify(self.receiptDo)], **kwa)
[docs] def receiptDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatible generator method (doer dog) Usage: add result of doify on this method to doers list Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.msgs: evt = self.msgs.popleft() pre = evt["pre"] if pre not in self.hby.habs: continue hab = self.hby.habs[pre] sn = evt["sn"] if "sn" in evt else hab.kever.sner.num wits = hab.kever.wits if len(wits) == 0: continue msg = hab.makeOwnEvent(sn=sn) ser = serdering.SerderKERI(raw=msg) dgkey = dbing.dgKey(ser.preb, ser.saidb) witers = [] for wit in wits: witer = messenger(hab, wit) witers.append(witer) self.extend([witer]) # Check to see if we already have all the receipts we need for this event wigs = hab.db.getWigs(dgkey) completed = len(wigs) == len(wits) if len(wigs) != len(wits): # We have all the receipts, skip for idx, witer in enumerate(witers): wit = wits[idx] for dmsg in hab.db.cloneDelegation(hab.kever): witer.msgs.append(bytearray(dmsg)) if ser.ked['t'] in (coring.Ilks.icp, coring.Ilks.dip) or \ "ba" in ser.ked and wit in ser.ked["ba"]: # Newly added witness, must send full KEL to catch up for fmsg in hab.db.clonePreIter(pre=pre): witer.msgs.append(bytearray(fmsg)) witer.msgs.append(bytearray(msg)) # make a copy _ = (yield self.tock) while True: wigs = hab.db.getWigs(dgkey) if len(wigs) == len(wits): break _ = yield self.tock # If we started with all our recipts, exit unless told to force resubmit of all receipts if completed and not self.force: self.cues.push(evt) continue # generate all rct msgs to send to all witnesses awigers = [coring.Siger(qb64b=bytes(wig)) for wig in wigs] # make sure all witnesses have fully receipted KERL and know about each other for witer in witers: ewits = [] wigers = [] for i, wit in enumerate(wits): if wit == witer.wit: continue ewits.append(wit) wigers.append(awigers[i]) if len(wigers) == 0: continue rctMsg = bytearray() # Now that the witnesses have not met each other, send them each other's receipts if ser.ked['t'] in (coring.Ilks.icp, coring.Ilks.dip): # introduce new witnesses rctMsg.extend(schemes(self.hby.db, eids=ewits)) elif ser.ked['t'] in (coring.Ilks.rot, coring.Ilks.drt) and \ ("ba" in ser.ked and witer.wit in ser.ked["ba"]): # Newly added witness, introduce to all rctMsg.extend(schemes(self.hby.db, eids=ewits)) rserder = eventing.receipt(pre=ser.pre, sn=sn, said=ser.said) rctMsg.extend(eventing.messagize(serder=rserder, wigers=wigers)) witer.msgs.append(rctMsg) _ = (yield self.tock) while True: done = True for witer in witers: if not witer.idle: yield 1.0 done = False break if done: break self.remove(witers) self.cues.push(evt) yield self.tock yield self.tock
[docs] class WitnessInquisitor(doing.DoDoer): """ Sends messages to all current witnesses of given identifier (from hab) and waits for receipts from each of those witnesses and propagates those receipts to each of the other witnesses after receiving the complete set. Removes all Doers and exits as Done once all witnesses have been sent the entire receipt set. Could be enhanced to have a `once` method that runs once and cleans up and an `all` method that runs and waits for more messages to receipt. """
[docs] def __init__(self, hby, reger=None, msgs=None, klas=None, **kwa): """ For all msgs, select a random witness from Habitat's current set of witnesses send the msg and process all responses (KEL replays, RCTs, etc) Parameters: hby (Habitat): Habitat of the identifier to use to identify witnesses msgs: is the message buffer to process and send to one random witness. """ self.hby = hby self.reger = reger self.klas = klas if klas is not None else HTTPMessenger self.msgs = msgs if msgs is not None else decking.Deck() self.sent = decking.Deck() super(WitnessInquisitor, self).__init__(doers=[doing.doify(self.msgDo)], **kwa)
[docs] def msgDo(self, tymth=None, tock=1.0, **opts): """ Returns doifiable Doist compatible generator method (doer dog) Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while not self.msgs: yield self.tock evt = self.msgs.popleft() pre = evt["pre"] target = evt["target"] src = evt["src"] r = evt["r"] q = evt["q"] wits = evt["wits"] if "wits" in evt else None if "hab" in evt: hab = evt["hab"] elif (hab := self.hby.habByPre(src)) is None: continue if not wits and pre not in self.hby.kevers: logger.error(f"must have KEL for identifier to query {pre}") continue if not wits: ends = hab.endsFor(pre=pre) if Roles.controller in ends: end = ends[Roles.controller] elif Roles.agent in ends: end = ends[Roles.agent] elif Roles.witness in ends: end = ends[Roles.witness] else: logger.error(f"unable query: can not find a valid role for {pre}") continue if len(end.items()) == 0: logger.error(f"must have endpoint to query for pre={pre}") continue ctrl, locs = random.choice(list(end.items())) if len(locs.items()) == 0: logger.error(f"must have location in endpoint to query for pre={pre}") continue witer = messengerFrom(hab=hab, pre=ctrl, urls=locs) else: wit = random.choice(wits) witer = messenger(hab, wit) self.extend([witer]) msg = hab.query(target, src=witer.wit, route=r, query=q) # Query for remote pre Event kel = forwarding.introduce(hab, witer.wit) if kel: witer.msgs.append(bytearray(kel)) witer.msgs.append(bytearray(msg)) while not witer.sent: yield self.tock self.sent.append(witer.sent.popleft()) yield self.tock
[docs] def query(self, pre, r="logs", sn='0', src=None, hab=None, anchor=None, wits=None, **kwa): """ Create, sign and return a `qry` message against the attester for the prefix Parameters: src (str): qb64 identifier prefix of source of query hab (Hab): Hab to use instead of src if provided pre (str): qb64 identifier prefix being queried for r (str): query route sn (str): optional specific hex str of sequence number to query for anchor (Seal): anchored Seal to search for wits (list) witnesses to query Returns: bytearray: signed query event """ qry = dict(s=sn) if anchor is not None: qry["a"] = anchor msg = dict(src=src, pre=pre, target=pre, r=r, q=qry, wits=wits) if hab is not None: msg["hab"] = hab self.msgs.append(msg)
def telquery(self, ri, src=None, i=None, r="tels", hab=None, pre=None, wits=None, **kwa): qry = dict(ri=ri) msg = dict(src=src, pre=pre, target=i, r=r, wits=wits, q=qry) if hab is not None: msg["hab"] = hab self.msgs.append(msg)
[docs] class WitnessPublisher(doing.DoDoer): """ Sends messages to all current witnesses of given identifier (from hab) and exits. Removes all Doers and exits as Done once all witnesses have been sent the message. Could be enhanced to have a `once` method that runs once and cleans up and an `all` method that runs and waits for more messages to receipt. """
[docs] def __init__(self, hby, msgs=None, cues=None, **kwa): """ For the current event, gather the current set of witnesses, send the event, gather all receipts and send them to all other witnesses Parameters: hby (Habery): Habitat of the identifier to populate witnesses msgs (Deck): incoming messages to publish to witnesses cues (Deck): outgoing cues of successful messages """ self.hby = hby self.msgs = msgs if msgs is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() super(WitnessPublisher, self).__init__(doers=[doing.doify(self.sendDo)], **kwa)
[docs] def sendDo(self, tymth=None, tock=0.0, **opts): """ Returns doifiable Doist compatible generator method (doer dog) Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.msgs: evt = self.msgs.popleft() pre = evt["pre"] msg = evt["msg"] if pre not in self.hby.habs: continue hab = self.hby.habs[pre] wits = hab.kever.wits witers = [] for wit in wits: witer = messenger(hab, wit) witers.append(witer) witer.msgs.append(bytearray(msg)) # make a copy so everyone munges their own self.extend([witer]) _ = (yield self.tock) total = len(witers) count = 0 while count < total: for witer in witers: count += len(witer.sent) _ = (yield self.tock) self.remove(witers) self.cues.push(evt) yield self.tock yield self.tock
[docs] def sent(self, said): """ Check if message with given SAID was sent Parameters: said (str): qb64 SAID of message to check for """ for cue in self.cues: if cue["said"] == said: return True return False
[docs] class TCPMessenger(doing.DoDoer): """ Send events to witnesses for receipting using TCP direct connection """
[docs] def __init__(self, hab, wit, url, msgs=None, sent=None, doers=None, **kwa): """ For the current event, gather the current set of witnesses, send the event, gather all receipts and send them to all other witnesses Parameters: hab: Habitat of the identifier to populate witnesses """ self.hab = hab self.wit = wit self.url = url self.posted = 0 self.msgs = msgs if msgs is not None else decking.Deck() self.sent = sent if sent is not None else decking.Deck() self.parser = None doers = doers if doers is not None else [] doers.extend([doing.doify(self.receiptDo)]) self.kevery = eventing.Kevery(db=self.hab.db, **kwa) super(TCPMessenger, self).__init__(doers=doers)
[docs] def receiptDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatible generator method (doer dog) Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) up = urlparse(self.url) if up.scheme != kering.Schemes.tcp: raise ValueError(f"invalid scheme {up.scheme} for TcpWitnesser") client = clienting.Client(host=up.hostname, port=up.port) self.parser = parsing.Parser(ims=client.rxbs, framed=True, kvy=self.kevery) clientDoer = clienting.ClientDoer(client=client) self.extend([clientDoer, doing.doify(self.msgDo)]) while True: while not self.msgs: yield self.tock msg = self.msgs.popleft() self.posted += 1 client.tx(msg) # send to connected remote while client.txbs: yield self.tock self.sent.append(msg) yield self.tock
[docs] def msgDo(self, tymth=None, tock=0.0, **opts): """ Returns doifiable Doist compatible generator method (doer dog) to process incoming message stream of .kevery Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value opts is dict of injected optional additional parameters Usage: add result of doify on this method to doers list """ yield from self.parser.parsator() # process messages continuously
@property def idle(self): return len(self.sent) == self.posted
[docs] class TCPStreamMessenger(doing.DoDoer): """ Send events to witnesses for receipting using TCP direct connection """
[docs] def __init__(self, hab, wit, url, msgs=None, sent=None, doers=None, **kwa): """ For the current event, gather the current set of witnesses, send the event, gather all receipts and send them to all other witnesses Parameters: hab: Habitat of the identifier to populate witnesses """ self.hab = hab self.wit = wit self.url = url self.posted = 0 self.msgs = msgs if msgs is not None else decking.Deck() self.sent = sent if sent is not None else decking.Deck() self.parser = None doers = doers if doers is not None else [] doers.extend([doing.doify(self.receiptDo)]) self.kevery = eventing.Kevery(db=self.hab.db, **kwa) super(TCPStreamMessenger, self).__init__(doers=doers)
[docs] def receiptDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatible generator method (doer dog) Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) up = urlparse(self.url) if up.scheme != kering.Schemes.tcp: raise ValueError(f"invalid scheme {up.scheme} for TcpWitnesser") client = clienting.Client(host=up.hostname, port=up.port) self.parser = parsing.Parser(ims=client.rxbs, framed=True, kvy=self.kevery) clientDoer = clienting.ClientDoer(client=client) self.extend([clientDoer, doing.doify(self.msgDo)]) while True: while not self.msgs: yield self.tock msg = self.msgs.popleft() self.posted += 1 client.tx(msg) # send to connected remote while client.txbs: yield self.tock self.sent.append(msg) yield self.tock
[docs] def msgDo(self, tymth=None, tock=0.0, **opts): """ Returns doifiable Doist compatible generator method (doer dog) to process incoming message stream of .kevery Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value opts is dict of injected optional additional parameters Usage: add result of doify on this method to doers list """ yield from self.parser.parsator() # process messages continuously
@property def idle(self): return len(self.sent) == self.posted
[docs] class HTTPMessenger(doing.DoDoer): """ Interacts with Recipients on HTTP and SSE for sending events and receiving receipts """
[docs] def __init__(self, hab, wit, url, msgs=None, sent=None, doers=None, **kwa): """ For the current event, gather the current set of witnesses, send the event, gather all receipts and send them to all other witnesses Parameters: hab: Habitat of the identifier to populate witnesses """ self.hab = hab self.wit = wit self.posted = 0 self.msgs = msgs if msgs is not None else decking.Deck() self.sent = sent if sent is not None else decking.Deck() self.parser = None doers = doers if doers is not None else [] doers.extend([doing.doify(self.msgDo), doing.doify(self.responseDo)]) up = urlparse(url) if up.scheme != kering.Schemes.http and up.scheme != kering.Schemes.https: raise ValueError(f"invalid scheme {up.scheme} for HTTPMessenger") self.client = http.clienting.Client(scheme=up.scheme, hostname=up.hostname, port=up.port) clientDoer = http.clienting.ClientDoer(client=self.client) doers.extend([clientDoer]) super(HTTPMessenger, self).__init__(doers=doers, **kwa)
[docs] def msgDo(self, tymth=None, tock=0.0): """ Returns doifiable Doist compatible generator method (doer dog) Usage: add result of doify on this method to doers list """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while not self.msgs: yield self.tock msg = self.msgs.popleft() self.posted += httping.streamCESRRequests(client=self.client, dest=self.wit, ims=msg) while self.client.requests: yield self.tock yield self.tock
[docs] def responseDo(self, tymth=None, tock=0.0): """ Processes responses from client and adds them to sent cue """ self.wind(tymth) self.tock = tock _ = (yield self.tock) while True: while self.client.responses: rep = self.client.respond() self.sent.append(rep) yield yield
@property def idle(self): return len(self.msgs) == 0 and self.posted == len(self.sent)
[docs] class HTTPStreamMessenger(doing.DoDoer): """ Interacts with Recipients on HTTP and SSE for sending events and receiving receipts """
[docs] def __init__(self, hab, wit, url, msg=b'', headers=None, **kwa): """ For the current event, gather the current set of witnesses, send the event, gather all receipts and send them to all other witnesses Parameters: hab: Habitat of the identifier to populate witnesses """ self.hab = hab self.wit = wit self.rep = None headers = headers if headers is not None else {} up = urlparse(url) if up.scheme != kering.Schemes.http and up.scheme != kering.Schemes.https: raise ValueError(f"invalid scheme {up.scheme} for HTTPMessenger") self.client = http.clienting.Client(scheme=up.scheme, hostname=up.hostname, port=up.port) clientDoer = http.clienting.ClientDoer(client=self.client) headers = Hict([ ("Content-Type", "application/cesr"), ("Content-Length", len(msg)), (httping.CESR_DESTINATION_HEADER, self.wit), ] + list(headers.items())) self.client.request( method="PUT", path="/", headers=headers, body=bytes(msg) ) doers = [clientDoer] super(HTTPStreamMessenger, self).__init__(doers=doers, **kwa)
[docs] def recur(self, tyme, deeds=None): """ Returns doifiable Doist compatible generator method (doer dog) Usage: add result of doify on this method to doers list """ if self.client.responses: self.rep = self.client.respond() self.remove([self.client]) return True return super(HTTPStreamMessenger, self).recur(tyme, deeds)
def mailbox(hab, cid): for (_, erole, eid), end in hab.db.ends.getItemIter(keys=(cid, kering.Roles.mailbox)): if end.allowed: return eid if cid not in hab.kevers: return None kever = hab.kevers[cid] if not kever.wits: return None mbx = random.choice(kever.wits) return mbx
[docs] def messenger(hab, pre): """ Create a Messenger (tcp or http) based on available endpoints Parameters: hab (Habitat): Environment to use to look up witness URLs pre (str): qb64 identifier prefix of recipient to create a messanger for Returns: Optional(TcpWitnesser, HTTPMessenger): witnesser for ensuring full reciepts """ urls = hab.fetchUrls(eid=pre) return messengerFrom(hab, pre, urls)
[docs] def messengerFrom(hab, pre, urls): """ Create a Witnesser (tcp or http) based on provided endpoints Parameters: hab (Habitat): Environment to use to look up witness URLs pre (str): qb64 identifier prefix of recipient to create a messanger for urls (dict): map of schemes to urls of available endpoints Returns: Optional(TcpWitnesser, HTTPMessenger): witnesser for ensuring full reciepts """ if kering.Schemes.http in urls or kering.Schemes.https in urls: url = urls[kering.Schemes.http] if kering.Schemes.http in urls else urls[kering.Schemes.https] witer = HTTPMessenger(hab=hab, wit=pre, url=url) elif kering.Schemes.tcp in urls: url = urls[kering.Schemes.tcp] witer = TCPMessenger(hab=hab, wit=pre, url=url) else: raise kering.ConfigurationError(f"unable to find a valid endpoint for witness {pre}") return witer
[docs] def streamMessengerFrom(hab, pre, urls, msg, headers=None): """ Create a Witnesser (tcp or http) based on provided endpoints Parameters: hab (Habitat): Environment to use to look up witness URLs pre (str): qb64 identifier prefix of recipient to create a messanger for urls (dict): map of schemes to urls of available endpoints msg (bytes): bytes of message to send headers (dict): optional headers to send with HTTP requests Returns: Optional(TcpWitnesser, HTTPMessenger): witnesser for ensuring full reciepts """ if kering.Schemes.http in urls or kering.Schemes.https in urls: url = urls[kering.Schemes.http] if kering.Schemes.http in urls else urls[kering.Schemes.https] witer = HTTPStreamMessenger(hab=hab, wit=pre, url=url, msg=msg, headers=headers) elif kering.Schemes.tcp in urls: url = urls[kering.Schemes.tcp] witer = TCPStreamMessenger(hab=hab, wit=pre, url=url) else: raise kering.ConfigurationError(f"unable to find a valid endpoint for witness {pre}") return witer
[docs] def httpClient(hab, wit): """ Create and return a http.client and http.ClientDoer for the witness Parameters: hab (Habitat): Environment to use to look up witness URLs wit (str): qb64 identifier prefix of witness for which to create a client Returns: Client: Http client for connecting to remote identifier ClientDoer: Doer for client """ urls = hab.fetchUrls(eid=wit, scheme=kering.Schemes.http) or hab.fetchUrls(eid=wit, scheme=kering.Schemes.https) if not urls: raise kering.MissingEntryError(f"unable to query witness {wit}, no http endpoint") url = urls[kering.Schemes.http] if kering.Schemes.http in urls else urls[kering.Schemes.https] up = urlparse(url) client = http.clienting.Client(scheme=up.scheme, hostname=up.hostname, port=up.port) clientDoer = http.clienting.ClientDoer(client=client) return client, clientDoer
def schemes(db, eids): msgs = bytearray() for eid in eids: for scheme in kering.Schemes: keys = (eid, scheme) said = db.lans.get(keys=keys) if said is not None: serder = db.rpys.get(keys=(said.qb64,)) cigars = db.scgs.get(keys=(said.qb64,)) if len(cigars) == 1: (verfer, cigar) = cigars[0] cigar.verfer = verfer else: cigar = None msgs.extend(eventing.messagize(serder=serder, cigars=[cigar], pipelined=True)) return msgs