# -*- encoding: utf-8 -*-
"""
KERI
keri.app.directing module
simple direct mode demo support classes
"""
import itertools
from hio.base import doing
from hio.help import ogler
from .. import Vrsn_1_0
from ..core import Kevery, Revery, Parser
from ..vdr import Tevery
logger = ogler.getLogger()
[docs]
class Director(doing.Doer):
"""Base class for Direct Mode KERI Controller Doer with habitat and TCP client.
Attributes:
hab (Habitat): Local controller's Habitat instance.
client (Client): hio TCP client instance. Assumed to be operated
by a separate doer.
"""
[docs]
def __init__(self, hab, client, **kwa):
"""Initialize instance.
Args:
tymist (Tymist): Tymist instance.
tock (float): Seconds initial value of .tock.
hab (Habitat): Habitat instance.
client (Client): TCP Client instance. Assumes opened/closed elsewhere.
"""
super(Director, self).__init__(**kwa)
self.hab = hab
self.client = client # use client to initiate comms
if self.tymth:
self.client.wind(self.tymth)
[docs]
def wind(self, tymth):
"""
Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base.
Updates winds .tymer .tymth
"""
super(Director, self).wind(tymth)
self.client.wind(tymth)
[docs]
def sendOwnEvent(self, sn):
"""
Utility to send own event at sequence number sn
"""
msg = self.hab.msgOwnEvent(sn=sn, framed=True)
# send to connected remote
self.client.tx(msg)
logger.info("%s: %s sent event:\n%s\n\n", self.hab.name, self.hab.pre, bytes(msg))
[docs]
def sendOwnInception(self):
"""
Utility to send own inception on client
"""
self.sendOwnEvent(sn=0)
[docs]
class Reactor(doing.DoDoer):
"""
Reactor Subclass of DoDoer with doers list from do generator methods: .msgDo, .cueDo, and .escrowDo.
Enables continuous scheduling of doers (do generator instances or functions)
Implements Doist like functionality to allow nested scheduling of doers.
Each DoDoer runs a list of doers like a Doist but using the tyme from its
injected tymist as injected by its parent DoDoer or Doist.
Scheduling hierarchy: Doist->DoDoer...->DoDoer->Doers
Inherited Attributes:
.done is Boolean completion state:
True means completed
Otherwise incomplete. Incompletion maybe due to close or abort.
.opts is dict of injected options for its generator .do
.doers is list of Doers or Doer like generator functions
Attributes:
hab (Habitat): Local controller's Habitat instance.
client (TCP Client): TCP client used for both receive and transmit.
verifier (Verifier): Optional Verifier instance for TEL context.
None if TEL processing is not required.
exc: Optional Exchanger instance for peer-to-peer key-event exchange
messages. None if not required.
direct (bool): True means direct mode; cue'd receipts are processed
immediately. False means indirect mode; cue'd receipts are skipped.
kevery (Kevery): Event processor for incoming key events.
tvy (Tevery): Event processor for incoming transaction events.
None when verifier is None.
parser (Parser): Stream parser bound to client.rxbs.
done (bool): Completion state set by DoDoer. True means completed
normally. False or None means incomplete.
opts (dict): Injected options passed to the .do generator.
doers (list): Scheduled Doer instances or generator functions.
Inherited Properties:
.tyme is float relative cycle time of associated Tymist .tyme obtained
via injected .tymth function wrapper closure.
.tymth is function wrapper closure returned by Tymist .tymeth() method.
When .tymth is called it returns associated Tymist .tyme.
.tymth provides injected dependency on Tymist tyme base.
.tock is float, desired time in seconds between runs or until next run,
non negative, zero means run asap
Properties:
Inherited Methods:
.wind injects ._tymth dependency from associated Tymist to get its .tyme
.__call__ makes instance callable
Appears as generator function that returns generator
.do is generator method that returns generator
.enter is enter context action method
.recur is recur context action method or generator method
.clean is clean context action method
.exit is exit context method
.close is close context method
.abort is abort context method
Overidden Methods:
Hidden:
._tymth is injected function wrapper closure returned by .tymen() of
associated Tymist instance that returns Tymist .tyme. when called.
._tock is hidden attribute for .tock property
"""
[docs]
def __init__(self, hab, client, verifier=None, exchanger=None, direct=True, doers=None, **kwa):
"""Initialize instance and extend doers with msgDo, escrowDo, cueDo.
Args:
hab (Habitat): Local controller's Habitat instance.
client (TCP Client): TCP client used for both receive and transmit.
verifier (Verifier, optional): Verifier instance providing TEL
context. When provided a Tevery is created and bound to the
parser. Defaults to None.
exchanger: optional Exchanger instance for exn
message processing. Defaults to None.
direct (bool, optional): True to process cue'd receipts in direct
mode. False to skip cue'd receipt processing. Defaults to True.
doers (list, optional): Initial list of Doer instances or generator
functions to schedule. msgDo, escrowDo, and cueDo are always
appended. Defaults to None.
**kwa: Additional keyword arguments forwarded to DoDoer.__init__.
"""
self.hab = hab
self.client = client # use client for both rx and tx
self.verifier = verifier
self.exc = exchanger
self.direct = True if direct else False
doers = doers if doers is not None else []
doers.extend([doing.doify(self.msgDo),
doing.doify(self.escrowDo),
doing.doify(self.cueDo)])
self.kevery = Kevery(db=self.hab.db,
lax=False,
local=False,
direct=self.direct)
if self.verifier is not None:
self.tvy = Tevery(reger=self.verifier.reger,
db=self.hab.db,
local=False)
else:
self.tvy = None
self.parser = Parser(ims=self.client.rxbs,
framed=True,
kvy=self.kevery,
tvy=self.tvy,
exc=self.exc,
version=Vrsn_1_0)
super(Reactor, self).__init__(doers=doers, **kwa)
if self.tymth:
self.client.wind(self.tymth)
[docs]
def wind(self, tymth):
"""Inject a new tymth closure and propagate it to the TCP client.
Overrides DoDoer.wind to ensure client.wind is called whenever the
Tymist dependency changes.
Args:
tymth (callable): Closure returned by Tymist.tymeth() that, when
called, returns the current Tymist.tyme.
"""
super(Reactor, self).wind(tymth)
self.client.wind(tymth)
[docs]
def msgDo(self, tymth=None, tock=0.0, **opts):
"""Doer that continuously parses the incoming TCP message stream.
Delegates to Parser.parsator, which reads from client.rxbs and feeds
events to kevery (and tvy when present).
Args:
tymth (callable, optional): Injected tymth closure from the Doist.
Defaults to None.
tock (float, optional): Injected initial tock value in seconds.
Defaults to 0.0.
**opts: Additional injected options from the Doist.
Yields:
None: Yields control back to the scheduler on each cycle.
Returns:
bool: Done state from Parser.parsator. Only reached on forced close.
"""
yield # enter context
if self.parser.ims:
logger.info("Client %s received:\n%s\n...\n", self.hab.name, self.parser.ims[:1024])
done = yield from self.parser.parsator(local=True) # process messages continuously
return done # should nover get here except forced close
[docs]
def cueDo(self, tymth=None, tock=0.0, **opts):
"""Doer that drains kevery.cues and sends resulting receipt messages.
In each cycle, iterates hab.processCuesIter over kevery.cues and
transmits each produced message via sendMessage. Yields after each
message to throttle output, then yields again at end of each cycle.
Args:
tymth (callable, optional): Injected tymth closure from the Doist.
Defaults to None.
tock (float, optional): Injected initial tock value in seconds.
Defaults to 0.0.
**opts: Additional injected options from the Doist.
Yields:
None: Yields control back to the scheduler on each cycle.
Returns:
bool: Always False. Only reached on forced close.
"""
yield # enter context
while True:
for msg in self.hab.processCuesIter(self.kevery.cues):
self.sendMessage(msg, label="chit or receipt")
yield # throttle just do one cue at a time
yield
return False # should never get here except forced close
[docs]
def escrowDo(self, tymth=None, tock=0.0, **opts):
"""Doer that processes escrowed events on every cycle.
Calls kevery.processEscrows() each cycle and, when tvy is present,
also calls tvy.processEscrows().
Args:
tymth (callable, optional): Injected tymth closure from the Doist.
Defaults to None.
tock (float, optional): Injected initial tock value in seconds.
Defaults to 0.0.
**opts: Additional injected options from the Doist.
Yields:
None: Yields control back to the scheduler on each cycle.
Returns:
bool: Always False. Only reached on forced close.
"""
yield # enter context
while True:
self.kevery.processEscrows()
if self.tvy is not None:
self.tvy.processEscrows()
yield
return False # should never get here except forced close
[docs]
def sendMessage(self, msg, label=""):
"""Transmit a message over the TCP client and log it.
Args:
msg (bytes): Serialized message to transmit.
label (str, optional): Descriptive label used in the log line.
Defaults to empty string.
"""
self.client.tx(msg) # send to remote
logger.info("%s sent %s:\n%s\n\n", self.hab.name, label, bytes(msg))
[docs]
class Directant(doing.DoDoer):
"""Subclass of DoDoer that accepts TCP connections and manages Reactants.
Responds to initiated connections from a remote Director by creating and
running a Reactant per connection and scheduling it as a live doer. Connections
that are cut off or whose timer has expired are closed and their Reactants
removed. Only one scheduled doer is added directly: serviceDo.
Part of the scheduling hierarchy: Doist -> DoDoer -> ... -> Doers.
Inherits the tyme/tymth injected-dependency system from DoDoer; call
.wind() to inject a Tymist before use.
Attributes:
hab (Habitat): Local controller's Habitat instance.
verifier (Verifier): Optional Verifier for TEL context processing.
None if TEL processing is not required.
exchanger: Optional Exchanger for exn message processing.
None if not required.
server (TCP Server): TCP server instance, operated by a separate doer.
rants (dict): Active Reactant instances keyed by connection address.
done (bool): Completion state set by DoDoer. True means completed
normally. False or None means incomplete.
opts (dict): Injected options passed to the .do generator.
doers (list): Scheduled Doer instances or generator functions.
"""
[docs]
def __init__(self, hab, server, verifier=None, exchanger=None, doers=None, **kwa):
"""Initialize instance and extend doers with serviceDo.
Args:
hab (Habitat): Local controller's Habitat instance.
server (TCP Server): TCP server instance used to accept and
track inbound connections.
verifier (Verifier, optional): Verifier instance providing TEL
context. Forwarded to each spawned Reactant. Defaults to None.
exchanger: optional Exchanger instance for exn
message processing. Forwarded to each spawned Reactant.
Defaults to None.
doers (list, optional): Initial list of Doer instances or generator
functions to schedule. serviceDo is always appended.
Defaults to None.
**kwa: Additional keyword arguments forwarded to DoDoer.__init__.
"""
self.hab = hab
self.verifier = verifier
self.exchanger = exchanger
self.server = server # use server for cx
self.rants = dict()
doers = doers if doers is not None else []
doers.extend([doing.doify(self.serviceDo)])
super(Directant, self).__init__(doers=doers, **kwa)
if self.tymth:
self.server.wind(self.tymth)
[docs]
def wind(self, tymth):
"""Inject a new tymth closure and propagate it to the TCP server.
Overrides DoDoer.wind to ensure server.wind is called whenever the
Tymist dependency changes.
Args:
tymth (callable): Closure returned by Tymist.tymeth() that, when
called, returns the current Tymist.tyme.
"""
super(Directant, self).wind(tymth)
self.server.wind(tymth)
[docs]
def serviceDo(self, tymth=None, tock=0.0, **opts):
"""Doer that services inbound connections and manages Reactant lifecycle.
Each cycle iterates server.ixes. For each connection address:
- If the connection is cut off, closeConnection is called.
- If no Reactant exists for the address yet, one is created and
extended into the running doers via self.extend.
- If the connection has a positive tymeout and its timer has expired,
closeConnection is called.
Args:
tymth (callable, optional): Injected tymth closure from the Doist.
Defaults to None.
tock (float, optional): Injected initial tock value in seconds.
Defaults to 0.0.
**opts: Additional injected options from the Doist.
Yields:
None: Yields control back to the scheduler on each cycle.
"""
yield # enter context
while True:
for ca, ix in list(self.server.ixes.items()):
if ix.cutoff:
self.closeConnection(ca)
continue
if ca not in self.rants: # create Reactant and extend doers with it
rant = Reactant(tymth=self.tymth, hab=self.hab, verifier=self.verifier,
exchanger=self.exchanger, remoter=ix)
self.rants[ca] = rant
# add Reactant (rant) doer to running doers
self.extend(doers=[rant]) # open and run rant as doer
if ix.tymeout > 0.0 and ix.tymer.expired:
self.closeConnection(ca) # also removes rant
yield
[docs]
def closeConnection(self, ca):
"""Flush, close, and clean up a connection and its associated Reactant.
Flushes pending send bytes on the remoter before removing it from the
server. If a Reactant exists for the address, it is closed and removed
from the doers list.
Args:
ca (tuple): Connection address key used in server.ixes and rants.
"""
if ca in self.server.ixes: # remoter still there
self.server.ixes[ca].serviceSends() # send final bytes to socket
self.server.removeIx(ca)
if ca in self.rants: # remove rant (Reactant) if any
self.remove([self.rants[ca]]) # close and remove rant from doers list
del self.rants[ca]
[docs]
class Reactant(doing.DoDoer):
"""Subclass of DoDoer that processes incoming KERI message streams from a TCP remoter.
Wires together a TCP remoter, a Kevery (and optionally a Tevery), a Revery,
and a Parser into three continuously-scheduled doers: msgDo, cueDo, and
escrowDo. Each Reactant instance owns its own Kevery and parser bound to
the remoter's receive buffer, so multiple simultaneous remote connections
each get independent processing state. Part of the scheduling hierarchy:
Doist -> DoDoer -> ... -> Doers. Inherits the tyme/tymth injected-dependency
system from DoDoer; call .wind() to inject a Tymist before use.
Attributes:
hab (Habitat): Local controller's Habitat instance.
verifier (Verifier): Optional Verifier instance for TEL context.
None if TEL processing is not required.
exchanger: Optional Exchanger instance for exn message processing.
None if not required.
remoter (TCP Remoter): TCP remoter used for both receive and transmit.
kevery (Kevery): Event processor for incoming key events.
tevery (Tevery): Event processor for incoming transaction events.
None when verifier is None.
parser (Parser): Stream parser bound to remoter.rxbs.
done (bool): Completion state set by DoDoer. True means completed
normally. False or None means incomplete.
opts (dict): Injected options passed to the .do generator.
doers (list): Scheduled Doer instances or generator functions.
"""
[docs]
def __init__(self, hab, remoter, verifier=None, exchanger=None, doers=None, **kwa):
"""Initialize instance and extend doers with msgDo, cueDo, escrowDo.
A Revery is always created and its router is registered on both
kevery and, when verifier is provided, tevery.
Args:
hab (Habitat): Local controller's Habitat instance.
remoter (TCP Remoter): TCP remoter used for both receive and
transmit.
verifier (Verifier, optional): Verifier instance providing TEL
context. When provided a Tevery is created, bound to the
parser, and its reply routes are registered. Defaults to None.
exchanger: optional Exchanger instance for exn
message processing. Defaults to None.
doers (list, optional): Initial list of Doer instances or generator
functions to schedule. msgDo, cueDo, and escrowDo are always
appended. Defaults to None.
**kwa: Additional keyword arguments forwarded to DoDoer.__init__.
"""
self.hab = hab
self.verifier = verifier
self.exchanger = exchanger
self.remoter = remoter # use remoter for both rx and tx
doers = doers if doers is not None else []
doers.extend([doing.doify(self.msgDo),
doing.doify(self.cueDo),
doing.doify(self.escrowDo)])
# needs unique kevery with ims per remoter connnection
rvy = Revery(db=hab.db)
self.kevery = Kevery(db=self.hab.db,
lax=False,
local=False,
rvy=rvy)
if self.verifier is not None:
self.tevery = Tevery(reger=self.verifier.reger,
db=self.hab.db,
local=False, rvy=rvy)
self.tevery.registerReplyRoutes(router=rvy.rtr)
else:
self.tevery = None
self.kevery.registerReplyRoutes(router=rvy.rtr)
self.parser = Parser(ims=self.remoter.rxbs,
framed=True,
kvy=self.kevery,
tvy=self.tevery,
exc=self.exchanger,
rvy=rvy,
version=Vrsn_1_0)
super(Reactant, self).__init__(doers=doers, **kwa)
if self.tymth:
self.remoter.wind(self.tymth)
[docs]
def wind(self, tymth):
"""Inject a new tymth closure and propagate it to the TCP remoter.
Overrides DoDoer.wind to ensure remoter.wind is called whenever the
Tymist dependency changes.
Args:
tymth (callable): Closure returned by Tymist.tymeth() that, when
called, returns the current Tymist.tyme.
"""
super(Reactant, self).wind(tymth)
self.remoter.wind(tymth)
[docs]
def msgDo(self, tymth=None, tock=0.0, **opts):
"""Doer that continuously parses the incoming TCP message stream.
Delegates to Parser.parsator, which reads from remoter.rxbs and feeds
events to kevery (and tevery when present).
Args:
tymth (callable, optional): Injected tymth closure from the Doist.
Defaults to None.
tock (float, optional): Injected initial tock value in seconds.
Defaults to 0.0.
**opts: Additional injected options from the Doist.
Yields:
None: Yields control back to the scheduler on each cycle.
Returns:
bool: Done state from Parser.parsator. Only reached on forced close.
"""
yield # enter context
if self.parser.ims:
logger.info("Server %s: received:\n%s\n...\n", self.hab.name,
self.parser.ims[:1024])
done = yield from self.parser.parsator(local=True) # process messages continuously
return done # should nover get here except forced close
[docs]
def cueDo(self, tymth=None, tock=0.0, **opts):
"""Doer that drains kevery.cues and sends resulting receipt messages.
In each cycle, iterates hab.processCuesIter over kevery.cues. Each
produced message is coerced to bytearray if it arrives as a list of
chunks, then transmitted via sendMessage. Yields after each message
to throttle output, then yields again at end of each cycle.
Args:
tymth (callable, optional): Injected tymth closure from the Doist.
Defaults to None.
tock (float, optional): Injected initial tock value in seconds.
Defaults to 0.0.
**opts: Additional injected options from the Doist.
Yields:
None: Yields control back to the scheduler on each cycle.
Returns:
bool: Always False. Only reached on forced close.
"""
yield # enter context
while True:
for msg in self.hab.processCuesIter(self.kevery.cues):
if isinstance(msg, list):
msg = bytearray(itertools.chain(*msg))
self.sendMessage(msg, label="chit or receipt or replay")
yield # throttle just do one cue at a time
yield
return False # should never get here except forced close
[docs]
def escrowDo(self, tymth=None, tock=0.0, **opts):
"""Doer that processes escrowed events on every cycle.
Calls kevery.processEscrows() each cycle and, when tevery is present,
also calls tevery.processEscrows().
Args:
tymth (callable, optional): Injected tymth closure from the Doist.
Defaults to None.
tock (float, optional): Injected initial tock value in seconds.
Defaults to 0.0.
**opts: Additional injected options from the Doist.
Yields:
None: Yields control back to the scheduler on each cycle.
Returns:
bool: Always False. Only reached on forced close.
"""
yield # enter context
while True:
self.kevery.processEscrows()
if self.tevery is not None:
self.tevery.processEscrows()
yield
return False # should never get here except forced close
[docs]
def sendMessage(self, msg, label=""):
"""Transmit a message over the TCP remoter and log it.
Args:
msg (bytes): Serialized message to transmit.
label (str, optional): Descriptive label used in the log line.
Defaults to empty string.
"""
self.remoter.tx(msg) # send to remote
logger.info("Server %s: sent %s:\n%d\n\n", self.hab.name,
label, len(msg))
[docs]
def runController(doers, expire=0.0):
"""
Utiitity Function to create doist to run doers
"""
tock = 0.03125
doist = doing.Doist(limit=expire, tock=tock, real=True)
doist.do(doers=doers)