# -*- encoding: utf-8 -*-
"""
KERI
keri.app.agenting module
"""
import random
from urllib.parse import urlparse, urljoin
from hio.base import doing
from hio.core import http
from hio.core.tcp import clienting
from hio.help import decking, Hict, ogler
from socket import gaierror
from .httping import Clienter, streamCESRRequests, CESR_DESTINATION_HEADER
from ..kering import (Schemes, Roles, Vrsn_1_0,
MissingEntryError, ConfigurationError,
MissingEntryError)
from ..core import Counter, eventing, parsing, coring, serdering, Codens
logger = ogler.getLogger()
[docs]
class Receiptor(doing.DoDoer):
"""
Receiptor is a parent task orchestrating both initial receipt retrieval of KEL events and
subsequent retrieval of receipts for specific events based on queries.
"""
[docs]
def __init__(self, hby, msgs=None, gets=None, cues=None):
"""
Initialize the Receiptor and create doers for processing and retrieving witness receipts.
Parameters:
hby (Habery): Habery to pull identifiers and witnesses for those identifiers from
msgs (Deck): KEL event messages to publish to witnesses for receipting
Messages should have {"pre": <str>, "sn": <int>, "auths": <dict>}
gets (Deck): query messages of KEL events to retrieve receipts from witnesses for
Messages should have {"pre": <str>, "sn": <int>}
cues (Deck): outgoing cues of successful messages; currently the messages placed here are not used
"""
self.msgs = msgs if msgs is not None else decking.Deck()
self.gets = gets if gets is not None else decking.Deck()
self.cues = cues if cues is not None else decking.Deck()
self.clienter = Clienter()
doers = [self.clienter, doing.doify(self.witDo), doing.doify(self.gitDo)]
self.hby = hby
super(Receiptor, self).__init__(doers=doers)
[docs]
def receipt(self, pre, sn=None, auths=None):
"""Returns a generator performing witness receipting of KEL events.
The returns a generator that will submit the designated event to witnesses for receipts using
the synchronous witness API, then propagate the receipts to each of the other witnesses.
Delegates to .catchup to catch up any new witnesses to the current state of the KEL.
Parameters:
pre (str): qualified base64 identifier to gather receipts for
sn: (Optiona[int]): sequence number of event to gather receipts for, latest is used if not provided
auths: (Options[dict]): map of witness AIDs to (time,auth) tuples for providing TOTP auth for witnessing
Returns:
list: identifiers of witnesses that returned receipts.
"""
auths = auths if auths is not None else dict()
if pre not in self.hby.prefixes:
raise MissingEntryError(f"{pre} not a valid AID")
hab = self.hby.habs[pre]
sn = sn if sn is not None else hab.kever.sner.num
wits = hab.kever.wits
if len(wits) == 0:
return
msg = hab.msgOwnEvent(sn=sn, framed=True)
ser = serdering.SerderKERI(raw=msg)
# If we are a rotation event, may need to catch new witnesses up to current key state
if ser.ked['t'] in (coring.Ilks.rot,):
adds = ser.ked["ba"]
for wit in adds:
yield from self.catchup(ser.pre, wit)
clients = dict()
doers = []
for wit in wits:
try:
client, clientDoer = httpClient(hab, wit)
clients[wit] = client
doers.append(clientDoer)
self.extend([clientDoer])
except (MissingEntryError, gaierror) as e:
logger.error(f"unable to create http client for witness {wit}: {e}")
# send to each witness and gather receipts
rcts = dict()
for wit, client in clients.items():
headers = dict()
if wit in auths:
headers["Authorization"] = auths[wit]
streamCESRRequests(client=client, dest=wit, ims=bytearray(msg), path="/receipts", headers=headers)
while not client.responses:
yield self.tock
rep = client.respond()
if rep.status == 200:
rct = bytearray(rep.body)
hab.psr.parseOne(bytearray(rct))
rserder = serdering.SerderKERI(raw=rct)
del rct[:rserder.size]
# pull off the count code
Counter(qb64b=rct, strip=True, version=Vrsn_1_0)
rcts[wit] = rct
else:
print(f"invalid response {rep.status} from witnesses {wit}")
# send retrieved receipts to all other witnesses
for wit in rcts:
ewits = [w for w in rcts if w != wit] # get complement of all other witnesses
wigers = [rcts[w] for w in ewits] # all other witness signatures
msg = bytearray()
if ser.ked['t'] in (coring.Ilks.icp, coring.Ilks.dip): # introduce new witnesses
msg.extend(schemes(self.hby.db, eids=ewits))
elif ser.ked['t'] in (coring.Ilks.rot, coring.Ilks.drt) and \
("ba" in ser.ked and wit in ser.ked["ba"]): # Newly added witness, introduce to all
msg.extend(schemes(self.hby.db, eids=ewits))
rserder = eventing.receipt(pre=hab.pre,
sn=sn,
said=ser.said,
version=ser.pvrsn,
kind=ser.kind)
msg.extend(rserder.raw)
msg.extend(Counter(Codens.NonTransReceiptCouples,
count=len(wigers), version=Vrsn_1_0).qb64b)
for wiger in wigers:
msg.extend(wiger)
client = clients[wit]
sent = streamCESRRequests(client=client, dest=wit, ims=bytearray(msg))
while len(client.responses) < sent:
yield self.tock
self.remove(doers)
return rcts.keys()
[docs]
def get(self, pre, sn=None):
"""
Queries a random witness for the receipt of the event at the sequence number for a prefix.
Returns:
a generator requesting receipts for event identified by pre and sn
Parameters:
pre (str): qualified base64 identifier to gather a receipt for
sn: (Optiona[int]): sequence number of event to gather receipts for, latest is used if not provided
"""
if pre not in self.hby.prefixes:
raise MissingEntryError(f"{pre} not a valid AID")
hab = self.hby.habs[pre]
sn = sn if sn is not None else hab.kever.sner.num
wits = hab.kever.wits
if len(wits) == 0:
return
wit = random.choice(hab.kever.wits)
urls = hab.fetchUrls(eid=wit, scheme=Schemes.https) or hab.fetchUrls(eid=wit, scheme=Schemes.http)
if not urls:
raise MissingEntryError(f"unable to query witness {wit}, no http endpoint")
base = urls[Schemes.https] if Schemes.https in urls else urls[Schemes.http]
url = urljoin(base, f"/receipts?pre={pre}&sn={sn}")
client = self.clienter.request("GET", url)
while not client.responses:
yield self.tock
rep = client.respond()
if rep.status == 200:
rct = bytearray(rep.body)
hab.psr.parseOne(bytearray(rct))
self.clienter.remove(client)
return rep.status == 200
[docs]
def catchup(self, pre, wit):
"""
A generator that catches up a target witness with the latest KEL state of a prefix.
When adding a new Witness use this method to catch the witness up to the current state of the KEL
Returns:
a generator that sends the KEL to the witness
Parameters:
pre (str): qualified base64 AID of the KEL to send
wit (str): qualified base64 AID of the witness to send the KEL to
"""
if pre not in self.hby.prefixes:
raise MissingEntryError(f"{pre} not a valid AID")
hab = self.hby.habs[pre]
client, clientDoer = httpClient(hab, wit)
self.extend([clientDoer])
for fmsg in hab.db.clonePreIter(pre=pre):
streamCESRRequests(client=client, dest=wit, ims=bytearray(fmsg))
while not client.responses:
yield self.tock
self.remove([clientDoer])
[docs]
def witDo(self, tymth=None, tock=0.0, **kwa):
"""
A generator that obtains witness receipts for one key event message at a tyme by processing all messages in
the .msgs buffer and adds processed messages to cues. Catches up any new witnesses in "adds"
to the current state of the KEL using self.catchup.
Intended to be used with doify to create a generator Doer.
Delegates to the internal receipt generator function.
Returns:
a Hio generator function to be used as a Doer.
Parameters:
tymth (function): function returning cycle time for configuring this Doer's cycle time.
tock (float): cycle time for this Doer, default is 0.0 seconds.
Usage:
add result of doify on this method to doers list
"""
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
while True:
while self.msgs:
msg = self.msgs.popleft()
pre = msg["pre"]
sn = msg["sn"] if "sn" in msg else None
auths = msg["auths"] if "auths" in msg else None
yield from self.receipt(pre, sn, auths)
self.cues.push(msg)
yield self.tock
[docs]
def gitDo(self, tymth=None, tock=0.0, **kwa):
"""
A generator that obtains a witness receipts for key event messages specified in the query messages in the
.gets buffer
Intended to be used with doify to create a generator Doer.
Returns:
a Hio generator function to be used as a Doer.
Parameters:
tymth (function): function returning cycle time for configuring this Doer's cycle time.
tock (float): cycle time for this Doer, default is 0.0 seconds.
Usage:
add result of doify on this method to doers list
"""
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
while True:
while self.gets:
msg = self.gets.popleft()
pre = msg["pre"]
sn = msg["sn"] if "sn" in msg else None
yield from self.get(pre, sn)
yield self.tock
[docs]
class WitnessReceiptor(doing.DoDoer):
"""
Sends receipt messages to all current witnesses of given identifier (from hab), waits
for receipts from each of those witnesses, and propagates those receipts to each
of the other witnesses after receiving the complete set.
Removes all Doers and exits as Done once all witnesses have been sent the entire
receipt set. Could be enhanced to have a `once` method that runs once and cleans up
and an `all` method that runs and waits for more messages to receipt.
"""
[docs]
def __init__(self, hby, msgs=None, cues=None, force=False, auths=None, **kwa):
"""
For the current event, gather the current set of witnesses, send the event,
gather all receipts and send them to all other witnesses
Parameters:
hby (Habery): Habitat of the identifier to receipt witnesses
msgs (Deck): events to send the event and receipt to all witnesses
Messages should have {"pre": <str>, "sn": <int>, "auths": <dict>}
cues (Deck): outgoing cues of events confirmed as fully receipted
Messages have {"pre": <str>, "sn": <int>, "auths": <dict>}
force (bool): True means to send witnesses all receipts even if we have a full complement.
auths (dict): map of witness AIDs to (time,auth) tuples for providing TOTP auth for witnessing
"""
self.hby = hby
self.force = force
self.msgs = msgs if msgs is not None else decking.Deck()
self.cues = cues if cues is not None else decking.Deck()
self.auths = auths if auths is not None else dict()
super(WitnessReceiptor, self).__init__(doers=[doing.doify(self.receiptDo)], **kwa)
[docs]
def receiptDo(self, tymth=None, tock=0.0, **kwa):
"""
Sends events, their receipts, receipt signatures, delegation chain, and location record
URLs between witnesses in the set of current witnesses.
Returns:
a doifiable Hio generator to perform event and receipt sending.
Usage:
add result of doify on this method to doers list
Parameters:
tymth (function): function returning cycle time for configuring this Doer's cycle time.
tock (float): cycle time for this Doer, default is 0.0 seconds.
"""
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
while True:
while self.msgs:
evt = self.msgs.popleft()
pre = evt["pre"]
if pre not in self.hby.habs:
continue
hab = self.hby.habs[pre]
sn = evt["sn"] if "sn" in evt else hab.kever.sner.num
wits = hab.kever.wits
if len(wits) == 0:
continue
msg = hab.msgOwnEvent(sn=sn, framed=True)
ser = serdering.SerderKERI(raw=msg)
witers = []
for wit in wits:
auth = self.auths[wit] if wit in self.auths else None
witer = messenger(hab, wit, auth=auth)
witers.append(witer)
self.extend([witer])
# Check to see if we already have all the receipts we need for this event
wigers = hab.db.wigs.get(keys=(ser.preb, ser.saidb))
completed = len(wigers) == len(wits)
if len(wigers) != len(wits): # We have all the receipts, skip
for idx, witer in enumerate(witers):
wit = wits[idx]
for dmsg in hab.db.cloneDelegation(hab.kever):
witer.msgs.append(bytearray(dmsg))
if ser.ked['t'] in (coring.Ilks.icp, coring.Ilks.dip) or \
"ba" in ser.ked and wit in ser.ked["ba"]: # Newly added witness, must send full KEL to catch up
for fmsg in hab.db.clonePreIter(pre=pre):
witer.msgs.append(bytearray(fmsg))
witer.msgs.append(bytearray(msg)) # make a copy
_ = (yield self.tock)
while True:
wigers = hab.db.wigs.get(keys=(ser.preb, ser.saidb))
if len(wigers) == len(wits):
break
_ = yield self.tock
# If we started with all our receipts, exit unless told to force resubmit of all receipts
if completed and not self.force:
self.cues.push(evt)
continue
# generate all rct msgs to send to all witnesses
awigers = wigers
# make sure all witnesses have fully receipted KERL and know about each other
for witer in witers:
ewits = []
wigers = []
for i, wit in enumerate(wits):
if wit == witer.wit:
continue
ewits.append(wit)
wigers.append(awigers[i])
if len(wigers) == 0:
continue
rctMsg = bytearray()
# Now that the witnesses have not met each other, send them each other's receipts
if ser.ked['t'] in (coring.Ilks.icp, coring.Ilks.dip): # introduce new witnesses
rctMsg.extend(schemes(self.hby.db, eids=ewits))
elif ser.ked['t'] in (coring.Ilks.rot, coring.Ilks.drt) and \
("ba" in ser.ked and witer.wit in ser.ked["ba"]): # Newly added witness, introduce to all
rctMsg.extend(schemes(self.hby.db, eids=ewits))
rserder = eventing.receipt(pre=ser.pre,
sn=sn,
said=ser.said,
version=ser.pvrsn,
kind=ser.kind)
rctMsg.extend(eventing.messagize(serder=rserder, wigers=wigers, framed=True))
witer.msgs.append(rctMsg)
_ = (yield self.tock)
while True:
done = True
for witer in witers:
if not witer.idle:
yield 1.0
done = False
break
if done:
break
self.remove(witers)
self.cues.push(evt)
yield self.tock
yield self.tock
[docs]
class WitnessInquisitor(doing.DoDoer):
"""
Sends KEL and TEL query messages to different types of targets whether witnesses, controllers, or agents
based on the locally available endpoint role records for the query target.
Queries are performed by sending query messages either to a random witness or to the controller or agent that is the target of the query.
Removes all Doers and exits as Done once the query target has been sent the query message
TODO: possibly rename based on the fact that multiple types of targets are supported (controller, agent, witness)
"""
[docs]
def __init__(self, hby, msgs=None, klas=None, **kwa):
"""
Initialize the WitnessInquisitor with the given parameters.
Parameters:
hby (Habery): Habery context to use to retrieve the source Hab for reading endpoint role records
klas (class): Type of messenger to use to send messages; defaults to HTTPMessenger; currently unused
msgs (decking.Deck): query message buffer to be sent to the target or a random witness
Attributes:
hby (Habery): Habery context to use to retrieve the source Hab for reading endpoint role records
klas (class): Type of messenger to use to send messages; defaults to HTTPMessenger; currently unused
msgs (decking.Deck): query message buffer to be sent to the target or a random witness
sent (decking.Deck): buffer for sent messages to track sent queries
"""
self.hby = hby
self.klas = klas if klas is not None else HTTPMessenger
self.msgs = msgs if msgs is not None else decking.Deck()
self.sent = decking.Deck()
super(WitnessInquisitor, self).__init__(doers=[doing.doify(self.msgDo)], **kwa)
[docs]
def msgDo(self, tymth=None, tock=1.0, **opts):
"""
Signs and sends provided KEL and TEL query messages to query targets using HTTPMessenger or
TCPMessenger. Uses a randomly selected witness if the query target is a witness, or uses
the specified controller or agent when a controller or agent is specified.
Returns a Hio generator function that runs until all messages in .msgs are processed.
Usage:
add result of doify on this method to doers list
"""
from .forwarding import introduce
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
while True:
while not self.msgs:
yield self.tock
evt = self.msgs.popleft()
pre = evt["pre"]
target = evt["target"]
src = evt["src"]
r = evt["r"]
q = evt["q"]
wits = evt["wits"] if "wits" in evt else None
if "hab" in evt:
hab = evt["hab"]
elif (hab := self.hby.habByPre(src)) is None:
continue
if not wits and pre not in self.hby.kevers:
logger.error(f"must have KEL for identifier to query {pre}")
continue
if not wits:
ends = hab.endsFor(pre=pre)
if Roles.controller in ends:
end = ends[Roles.controller]
elif Roles.agent in ends:
end = ends[Roles.agent]
elif Roles.witness in ends:
end = ends[Roles.witness]
else:
logger.error(f"unable query: can not find a valid role for {pre}")
continue
if len(end.items()) == 0:
logger.error(f"must have endpoint to query for pre={pre}")
continue
ctrl, locs = random.choice(list(end.items()))
if len(locs.items()) == 0:
logger.error(f"must have location in endpoint to query for pre={pre}")
continue
witer = messengerFrom(hab=hab, pre=ctrl, urls=locs)
else:
wit = random.choice(wits)
witer = messenger(hab, wit)
self.extend([witer])
msg = hab.query(target, src=witer.wit, route=r, query=q) # Query for remote pre Event
kel = introduce(hab, witer.wit)
if kel:
witer.msgs.append(bytearray(kel))
witer.msgs.append(bytearray(msg))
while not witer.sent:
yield self.tock
self.sent.append(witer.sent.popleft())
yield self.tock
[docs]
def query(self, pre, r="logs", sn='0', fn='0', src=None, hab=None, anchor=None, wits=None, **kwa):
"""
Create a KEL query (`qry`) message against the attester for the prefix (`pre`) and place on the internal .msgs
queue for processing by the .msgDo doer. May also contain an anchor to use to locate a key event.
May specify the hab to use for signing and retrieving endpoint role records.
Parameters:
pre (str): qb64 identifier prefix being queried for
r (str): query route
sn (str): optional specific hex str of sequence number to query for
fn (str): optional specific hex str of sequence number to start with
src (str): qb64 identifier prefix of source of query
hab (Hab): Hab to use instead of src, if provided, to retrieve endpoint role records from and to perform signing
anchor (Seal): anchored Seal to search for in the query target
wits (list) witnesses to query
"""
qry = dict(s=sn, fn=fn)
if anchor is not None:
qry["a"] = anchor
msg = dict(src=src, pre=pre, target=pre, r=r, q=qry, wits=wits)
if hab is not None:
msg["hab"] = hab
self.msgs.append(msg)
[docs]
def telquery(self, ri, src=None, i=None, r="tels", hab=None, pre=None, wits=None, **kwa):
"""
Create a TEL Query message to search against a given registry, issuer, route, in the target
prefixe's (`pre`) records, and add that query message on the internal .msgs queue.
May specify the hab to use for signing and retrieving endpoint role records.
Parameters:
ri (str): qb64 identifier prefix of the registry being queried
src (str): qb64 identifier prefix of source of query
i (str): qb64 identifier prefix of the issuer of the registry being queried
r (str): query route
hab (Hab): Hab to use instead of src, if provided, to retrieve endpoint role records from and to perform signing
pre (str): qb64 identifier prefix of the target being queried
wits (list): witnesses to query
"""
qry = dict(ri=ri)
msg = dict(src=src, pre=pre, target=i, r=r, wits=wits, q=qry)
if hab is not None:
msg["hab"] = hab
self.msgs.append(msg)
[docs]
class WitnessPublisher(doing.DoDoer):
"""
Sends messages to all current witnesses of given identifier (from hab) and exits.
Removes all Doers and exits as Done once all witnesses have been sent the message.
Could be enhanced to have a `once` method that runs once and cleans up
and an `all` method that runs and waits for more messages to receipt.
"""
[docs]
def __init__(self, hby, msgs=None, cues=None, **kwa):
"""Initialize with publish queue (msgs) and completion cues.
Parameters:
hby (Habery): Habitat of the identifier to populate witnesses
msgs (Deck): incoming messages to publish to witnesses
cues (Deck): outgoing cues of successful messages
"""
self.hby = hby
self.posted = 0
self.msgs = msgs if msgs is not None else decking.Deck()
self.cues = cues if cues is not None else decking.Deck()
super(WitnessPublisher, self).__init__(doers=[doing.doify(self.sendDo)], **kwa)
[docs]
def sendDo(self, tymth=None, tock=0.0, **opts):
"""Doer loop that sends queued messages to each witness.
Pushes the original request to self.cues to signal completion
"""
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
while True:
while self.msgs:
evt = self.msgs.popleft()
self.posted += 1
pre = evt["pre"]
msg = evt["msg"]
if pre not in self.hby.habs:
continue
hab = self.hby.habs[pre]
wits = hab.kever.wits
witers = []
for wit in wits:
witer = messenger(hab, wit)
witers.append(witer)
witer.msgs.append(bytearray(msg)) # make a copy so everyone munges their own
self.extend([witer])
_ = (yield self.tock)
while witers:
witer = witers.pop()
while not witer.idle:
_ = (yield self.tock)
self.remove(witers)
self.cues.push(evt)
yield self.tock
yield self.tock
[docs]
def sent(self, said):
""" Check if message with given SAID was sent
Parameters:
said (str): qb64 SAID of message to check for
"""
for cue in self.cues:
if cue["said"] == said:
return True
return False
@property
def idle(self):
return len(self.msgs) == 0 and self.posted == len(self.cues)
[docs]
class TCPMessenger(doing.DoDoer):
"""Send outbound CESR messages to a witness via TCP and parse inbound receipts."""
[docs]
def __init__(self, hab, wit, url, msgs=None, sent=None, doers=None, **kwa):
"""Initialize TCP messenger with queues and parser wiring.
Parameters:
hab (Hab): habitat for KEL parsing and db access.
wit (str): qb64 witness identifier.
url (str): tcp endpoint URL for the witness.
msgs (Deck | None): outbound message queue.
sent (Deck | None): sent message queue.
"""
self.hab = hab
self.wit = wit
self.url = url
self.posted = 0
self.msgs = msgs if msgs is not None else decking.Deck()
self.sent = sent if sent is not None else decking.Deck()
self.parser = None
doers = doers if doers is not None else []
doers.extend([doing.doify(self.receiptDo)])
self.kevery = eventing.Kevery(db=self.hab.db,
**kwa)
super(TCPMessenger, self).__init__(doers=doers)
[docs]
def receiptDo(self, tymth=None, tock=0.0, **kwa):
"""Doer loop that sends queued messages over TCP."""
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
up = urlparse(self.url)
if up.scheme != Schemes.tcp:
raise ValueError(f"invalid scheme {up.scheme} for TcpWitnesser")
client = clienting.Client(host=up.hostname, port=up.port)
self.parser = parsing.Parser(ims=client.rxbs,
framed=True,
kvy=self.kevery,
version=Vrsn_1_0)
clientDoer = clienting.ClientDoer(client=client)
self.extend([clientDoer, doing.doify(self.msgDo)])
while True:
while not self.msgs:
yield self.tock
msg = self.msgs.popleft()
self.posted += 1
client.tx(msg) # send to connected remote
while client.txbs:
yield self.tock
self.sent.append(msg)
yield self.tock
[docs]
def msgDo(self, tymth=None, tock=0.0, **opts):
"""Doer loop that parses inbound TCP messages into the Kevery."""
yield from self.parser.parsator(local=True) # process messages continuously
@property
def idle(self):
return len(self.sent) == self.posted
[docs]
class TCPStreamMessenger(doing.DoDoer):
"""Stream a CESR message to a witness via TCP and parse inbound receipts."""
[docs]
def __init__(self, hab, wit, url, msgs=None, sent=None, doers=None, **kwa):
"""Initialize TCP stream messenger with queues and parser wiring.
Parameters:
hab (Hab): habitat for KEL parsing and db access.
wit (str): qb64 witness identifier.
url (str): tcp endpoint URL for the witness.
msgs (Deck | None): outbound message queue.
sent (Deck | None): sent message queue.
"""
self.hab = hab
self.wit = wit
self.url = url
self.posted = 0
self.msgs = msgs if msgs is not None else decking.Deck()
self.sent = sent if sent is not None else decking.Deck()
self.parser = None
doers = doers if doers is not None else []
doers.extend([doing.doify(self.receiptDo)])
self.kevery = eventing.Kevery(db=self.hab.db,
**kwa)
super(TCPStreamMessenger, self).__init__(doers=doers)
[docs]
def receiptDo(self, tymth=None, tock=0.0, **kwa):
"""Doer loop that sends queued messages over TCP.
Pushes the original request to self.sent to signal completion
"""
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
up = urlparse(self.url)
if up.scheme != Schemes.tcp:
raise ValueError(f"invalid scheme {up.scheme} for TcpWitnesser")
client = clienting.Client(host=up.hostname, port=up.port)
self.parser = parsing.Parser(ims=client.rxbs,
framed=True,
kvy=self.kevery,
version=Vrsn_1_0)
clientDoer = clienting.ClientDoer(client=client)
self.extend([clientDoer, doing.doify(self.msgDo)])
while True:
while not self.msgs:
yield self.tock
msg = self.msgs.popleft()
self.posted += 1
client.tx(msg) # send to connected remote
while client.txbs:
yield self.tock
self.sent.append(msg)
yield self.tock
[docs]
def msgDo(self, tymth=None, tock=0.0, **opts):
"""Doer loop that parses inbound TCP messages into the Kevery."""
yield from self.parser.parsator(local=True) # process messages continuously
@property
def idle(self):
return len(self.sent) == self.posted
[docs]
class HTTPMessenger(doing.DoDoer):
"""Send CESR messages to a witness over HTTP and capture responses."""
[docs]
def __init__(self, hab, wit, url, msgs=None, sent=None, doers=None, auth=None, **kwa):
"""Initialize HTTP messenger with queues and optional auth.
Parameters:
hab (Hab): habitat for KEL parsing and db access.
wit (str): qb64 witness identifier.
url (str): http/https endpoint URL for the witness.
msgs (Deck | None): outbound message queue.
sent (Deck | None): response queue.
auth (str | None): optional 2FA auth codes for witnesses.
"""
self.hab = hab
self.wit = wit
self.posted = 0
self.msgs = msgs if msgs is not None else decking.Deck()
self.sent = sent if sent is not None else decking.Deck()
self.parser = None
self.auth = auth
doers = doers if doers is not None else []
doers.extend([doing.doify(self.msgDo), doing.doify(self.responseDo)])
up = urlparse(url)
if up.scheme != Schemes.http and up.scheme != Schemes.https:
raise ValueError(f"invalid scheme {up.scheme} for HTTPMessenger")
self.client = http.clienting.Client(scheme=up.scheme, hostname=up.hostname, port=up.port)
clientDoer = http.clienting.ClientDoer(client=self.client)
doers.extend([clientDoer])
super(HTTPMessenger, self).__init__(doers=doers, **kwa)
[docs]
def msgDo(self, tymth=None, tock=0.0, **kwa):
"""Doer loop that sends queued messages over HTTP."""
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
while True:
while not self.msgs:
yield self.tock
msg = self.msgs.popleft()
headers = dict()
if self.auth is not None:
headers["Authorization"] = self.auth
self.posted += streamCESRRequests(client=self.client, dest=self.wit, ims=msg, headers=headers)
while self.client.requests:
yield self.tock
yield self.tock
[docs]
def responseDo(self, tymth=None, tock=0.0, **kwa):
"""Doer loop that processes HTTP responses from the client and adds them into `sent` cues."""
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
while True:
while self.client.responses:
rep = self.client.respond()
self.sent.append(rep)
yield
yield
@property
def idle(self):
return len(self.msgs) == 0 and self.posted == len(self.sent)
[docs]
class HTTPStreamMessenger(doing.DoDoer):
"""Stream a single CESR message via HTTP PUT and capture the response."""
[docs]
def __init__(self, hab, wit, url, msg=b'', headers=None, **kwa):
"""Initialize a single-request HTTP messenger and add the HTTP client Doer to this DoDoer.
Parameters:
hab (Hab): habitat for KEL parsing and db access.
wit (str): qb64 witness identifier.
url (str): http/https endpoint URL for the witness.
msg (bytes): CESR message body to send.
headers (dict | None): extra HTTP headers.
"""
self.hab = hab
self.wit = wit
self.rep = None
headers = headers if headers is not None else {}
up = urlparse(url)
if up.scheme != Schemes.http and up.scheme != Schemes.https:
raise ValueError(f"invalid scheme {up.scheme} for HTTPMessenger")
self.client = http.clienting.Client(scheme=up.scheme, hostname=up.hostname, port=up.port)
clientDoer = http.clienting.ClientDoer(client=self.client)
headers = Hict([
("Content-Type", "application/cesr"),
("Content-Length", len(msg)),
(CESR_DESTINATION_HEADER, self.wit),
] + list(headers.items()))
self.client.request(
method="PUT",
path="/",
headers=headers,
body=bytes(msg)
)
doers = [clientDoer]
super(HTTPStreamMessenger, self).__init__(doers=doers, **kwa)
[docs]
def recur(self, tyme, deeds=None):
"""Poll for a response and stop once received."""
if self.client.responses:
self.rep = self.client.respond()
self.remove([self.client])
return True
return super(HTTPStreamMessenger, self).recur(tyme, deeds)
[docs]
def mailbox(hab, cid):
"""
Finds and returns a mailbox, if any, based on the provided Mab and controller AID (cid).
Returns:
str | None: qb64 identifier prefix of the mailbox to use for the controller, or None if no mailbox found.
Parameters:
hab (Hab): Hab to use to look up witness URLs
cid (str): qb64 identifier prefix of controller to find mailbox for
"""
for (_, erole, eid), end in hab.db.ends.getTopItemIter(keys=(cid, Roles.mailbox)):
if end.allowed:
return eid
if cid not in hab.kevers:
return None
kever = hab.kevers[cid]
if not kever.wits:
return None
mbx = random.choice(kever.wits)
return mbx
[docs]
def messenger(hab, pre, auth=None):
""" Create a Messenger (tcp or http) based on available endpoints
Parameters:
hab (Habitat): Environment to use to look up witness URLs
pre (str): qb64 identifier prefix of recipient to create a messanger for
auth (str): optional auth code to send with any request for messenger
Returns:
Optional(TcpWitnesser, HTTPMessenger): witnesser for ensuring full reciepts
"""
urls = hab.fetchUrls(eid=pre)
return messengerFrom(hab, pre, urls, auth)
[docs]
def messengerFrom(hab, pre, urls, auth=None):
""" Create a Witnesser (tcp or http) based on provided endpoints
Parameters:
hab (Habitat): Environment to use to look up witness URLs
pre (str): qb64 identifier prefix of recipient to create a messanger for
urls (dict): map of schemes to urls of available endpoints
auth (str): optional auth code to send with any request for messenger
Returns:
Optional(TcpWitnesser, HTTPMessenger): witnesser for ensuring full reciepts
"""
if Schemes.http in urls or Schemes.https in urls:
url = urls[Schemes.https] if Schemes.https in urls else urls[Schemes.http]
witer = HTTPMessenger(hab=hab, wit=pre, url=url, auth=auth)
elif Schemes.tcp in urls:
url = urls[Schemes.tcp]
witer = TCPMessenger(hab=hab, wit=pre, url=url)
else:
raise ConfigurationError(f"unable to find a valid endpoint for witness {pre}")
return witer
[docs]
def streamMessengerFrom(hab, pre, urls, msg, headers=None):
"""Create a stream messenger (HTTP or TCP) for a single outbound message.
Parameters:
hab (Habitat): Environment to use to look up witness URLs
pre (str): qb64 identifier prefix of recipient to create a messanger for
urls (dict): map of schemes to urls of available endpoints
msg (bytes): bytes of message to send
headers (dict): optional headers to send with HTTP requests
Returns:
Optional(TcpWitnesser, HTTPMessenger): witnesser for ensuring full reciepts
"""
if Schemes.http in urls or Schemes.https in urls:
url = urls[Schemes.https] if Schemes.https in urls else urls[Schemes.http]
witer = HTTPStreamMessenger(hab=hab, wit=pre, url=url, msg=msg, headers=headers)
elif Schemes.tcp in urls:
url = urls[Schemes.tcp]
witer = TCPStreamMessenger(hab=hab, wit=pre, url=url)
else:
raise ConfigurationError(f"unable to find a valid endpoint for witness {pre}")
return witer
[docs]
def httpClient(hab, wit):
""" Create and return a http.client and http.ClientDoer for the witness
Parameters:
hab (Habitat): Environment to use to look up witness URLs
wit (str): qb64 identifier prefix of witness for which to create a client
Returns:
Client: Http client for connecting to remote identifier
ClientDoer: Doer for client
"""
urls = hab.fetchUrls(eid=wit, scheme=Schemes.https) or hab.fetchUrls(eid=wit, scheme=Schemes.http)
if not urls:
raise MissingEntryError(f"unable to query witness {wit}, no http endpoint")
url = urls[Schemes.https] if Schemes.https in urls else urls[Schemes.http]
up = urlparse(url)
client = http.clienting.Client(scheme=up.scheme, hostname=up.hostname, port=up.port, path=up.path)
clientDoer = http.clienting.ClientDoer(client=client)
return client, clientDoer
[docs]
def schemes(db, eids):
"""
Creates a message bytearray of reply messages containing location records (URLs) and witness
signatures, if any, for a given list of authorized endpoint role AIDs (eids).
Returns:
a bytearray of reply messages and their signatures
Parameters:
db (Baser): Hab database used to retrieve location records and witness signatures
eids (list): list of endpoint role AIDs (eids) to retrieve location records and witness signatures for
"""
msgs = bytearray()
for eid in eids:
for scheme in Schemes:
keys = (eid, scheme)
said = db.lans.get(keys=keys)
if said is not None:
serder = db.rpys.get(keys=(said.qb64,))
cigars = db.scgs.get(keys=(said.qb64,))
if len(cigars) == 1:
(verfer, cigar) = cigars[0]
cigar.verfer = verfer
else:
cigar = None
msgs.extend(eventing.messagize(serder=serder,
cigars=[cigar],
framed=False))
return msgs