Source code for keri.app.directing

# -*- encoding: utf-8 -*-
"""
KERI
keri.app.directing module

simple direct mode demo support classes
"""
import itertools
from hio.base import doing

from .. import help
from ..core import eventing, routing
from ..core import parsing
from ..vdr.eventing import Tevery

logger = help.ogler.getLogger()


[docs] class Director(doing.Doer): """ Base class for Direct Mode KERI Controller Doer with habitat and TCP Client Attributes: hab (Habitat: local controller's context client (serving.Client): hio TCP client instance. Assumes operated by another doer. Inherited Properties: tyme (float): relative cycle time of associated Tymist, obtained via injected .tymth function wrapper closure. tymth (function): function wrapper closure returned by Tymist .tymeth() method. When .tymth is called it returns associated Tymist .tyme. .tymth provides injected dependency on Tymist tyme base. tock (float): desired time in seconds between runs or until next run, non negative, zero means run asap Properties: Inherited Methods: .__call__ makes instance callable return generator .do is generator function returns generator Methods: Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, hab, client, **kwa): """ Initialize instance. Inherited Parameters: tymist is Tymist instance tock is float seconds initial value of .tock Parameters: hab is Habitat instance client is TCP Client instance. Assumes opened/closed elsewhere """ super(Director, self).__init__(**kwa) self.hab = hab self.client = client # use client to initiate comms if self.tymth: self.client.wind(self.tymth)
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(Director, self).wind(tymth) self.client.wind(tymth)
[docs] def sendOwnEvent(self, sn): """ Utility to send own event at sequence number sn """ msg = self.hab.makeOwnEvent(sn=sn) # send to connected remote self.client.tx(msg) logger.info("%s: %s sent event:\n%s\n\n", self.hab.name, self.hab.pre, bytes(msg))
[docs] def sendOwnInception(self): """ Utility to send own inception on client """ self.sendOwnEvent(sn=0)
[docs] class Reactor(doing.DoDoer): """ Reactor Subclass of DoDoer with doers list from do generator methods: .msgDo, .cueDo, and .escrowDo. Enables continuous scheduling of doers (do generator instances or functions) Implements Doist like functionality to allow nested scheduling of doers. Each DoDoer runs a list of doers like a Doist but using the tyme from its injected tymist as injected by its parent DoDoer or Doist. Scheduling hierarchy: Doist->DoDoer...->DoDoer->Doers Inherited Attributes: .done is Boolean completion state: True means completed Otherwise incomplete. Incompletion maybe due to close or abort. .opts is dict of injected options for its generator .do .doers is list of Doers or Doer like generator functions Attributes: .hab is Habitat instance of local controller's context .client is TCP Client instance. .kevery is Kevery instance Inherited Properties: .tyme is float relative cycle time of associated Tymist .tyme obtained via injected .tymth function wrapper closure. .tymth is function wrapper closure returned by Tymist .tymeth() method. When .tymth is called it returns associated Tymist .tyme. .tymth provides injected dependency on Tymist tyme base. .tock is float, desired time in seconds between runs or until next run, non negative, zero means run asap Properties: Inherited Methods: .wind injects ._tymth dependency from associated Tymist to get its .tyme .__call__ makes instance callable Appears as generator function that returns generator .do is generator method that returns generator .enter is enter context action method .recur is recur context action method or generator method .clean is clean context action method .exit is exit context method .close is close context method .abort is abort context method Overidden Methods: Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, hab, client, verifier=None, exchanger=None, direct=True, doers=None, **kwa): """ Initialize instance. Inherited Parameters: tymist is Tymist instance tock is float seconds initial value of .tock doers is list of doers (do generator instances, functions or methods) Parameters: hab is Habitat instance of local controller's context client is TCP Client instance verifier is Verifier instance of local controller's TEL context direct is Boolean, True means direct mode so process cue'd receipts False means indirect mode so don't process cue'ed receipts """ self.hab = hab self.client = client # use client for both rx and tx self.verifier = verifier self.exc = exchanger self.direct = True if direct else False doers = doers if doers is not None else [] doers.extend([doing.doify(self.msgDo), doing.doify(self.escrowDo), doing.doify(self.cueDo)]) self.kevery = eventing.Kevery(db=self.hab.db, lax=False, local=False, direct=self.direct) if self.verifier is not None: self.tvy = Tevery(reger=self.verifier.reger, db=self.hab.db, local=False) else: self.tvy = None self.parser = parsing.Parser(ims=self.client.rxbs, framed=True, kvy=self.kevery, tvy=self.tvy, exc=self.exc) super(Reactor, self).__init__(doers=doers, **kwa) if self.tymth: self.client.wind(self.tymth)
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(Reactor, self).wind(tymth) self.client.wind(tymth)
[docs] def msgDo(self, tymth=None, tock=0.0, **opts): """ Returns doifiable Doist compatibile generator method (doer dog) to process incoming message stream of .kevery Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value opts is dict of injected optional additional parameters Usage: add result of doify on this method to doers list """ yield # enter context if self.parser.ims: logger.info("Client %s received:\n%s\n...\n", self.hab.name, self.parser.ims[:1024]) done = yield from self.parser.parsator() # process messages continuously return done # should nover get here except forced close
[docs] def cueDo(self, tymth=None, tock=0.0, **opts): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery.cues deque Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value opts is dict of injected optional additional parameters Usage: add result of doify on this method to doers list """ yield # enter context while True: for msg in self.hab.processCuesIter(self.kevery.cues): self.sendMessage(msg, label="chit or receipt") yield # throttle just do one cue at a time yield return False # should never get here except forced close
[docs] def escrowDo(self, tymth=None, tock=0.0, **opts): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery escrows. Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value opts is dict of injected optional additional parameters Usage: add result of doify on this method to doers list """ yield # enter context while True: self.kevery.processEscrows() if self.tvy is not None: self.tvy.processEscrows() yield return False # should never get here except forced close
[docs] def sendMessage(self, msg, label=""): """ Sends message msg and loggers label if any """ self.client.tx(msg) # send to remote logger.info("%s sent %s:\n%s\n\n", self.hab.name, label, bytes(msg))
[docs] class Directant(doing.DoDoer): """ Directant class with TCP Server. Responds to initiated connections from a remote Director by creating and running a Reactant per connection. Each Reactant has TCP remoter. Directant Subclass of DoDoer with doers list from do generator methods: .serviceDo Enables continuous scheduling of doers (do generator instances or functions) Implements Doist like functionality to allow nested scheduling of doers. Each DoDoer runs a list of doers like a Doist but using the tyme from its injected tymist as injected by its parent DoDoer or Doist. Scheduling hierarchy: Doist->DoDoer...->DoDoer->Doers Inherited Attributes: .done is Boolean completion state: True means completed Otherwise incomplete. Incompletion maybe due to close or abort. .opts is dict of injected options for its generator .do .doers is list of Doers or Doer like generator functions Attributes: .hab is Habitat instance of local controller's context .server is TCP client instance. Assumes operated by another doer. .rants is dict of Reactants indexed by connection address Inherited Properties: .tyme is float relative cycle time of associated Tymist .tyme obtained via injected .tymth function wrapper closure. .tymth is function wrapper closure returned by Tymist .tymeth() method. When .tymth is called it returns associated Tymist .tyme. .tymth provides injected dependency on Tymist tyme base. .tock is desired time in seconds between runs or until next run, non negative, zero means run asap Properties: Inherited Methods: .wind injects ._tymth dependency from associated Tymist to get its .tyme .__call__ makes instance callable Appears as generator function that returns generator .do is generator method that returns generator .enter is enter context action method .recur is recur context action method or generator method .clean is clean context action method .exit is exit context method .close is close context method .abort is abort context method Methods: Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, hab, server, verifier=None, exchanger=None, doers=None, **kwa): """ Initialize instance. Inherited Parameters: tymist is Tymist instance tock is float seconds initial value of .tock Parameters: db is database instance of local controller's context verifier (optional) is Verifier instance of local controller's TEL context server is TCP Server instance """ self.hab = hab self.verifier = verifier self.exchanger = exchanger self.server = server # use server for cx self.rants = dict() doers = doers if doers is not None else [] doers.extend([doing.doify(self.serviceDo)]) super(Directant, self).__init__(doers=doers, **kwa) if self.tymth: self.server.wind(self.tymth)
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(Directant, self).wind(tymth) self.server.wind(tymth)
[docs] def serviceDo(self, tymth=None, tock=0.0, **opts): """ Returns doifiable Doist compatibile generator method (doer dog) to service connections on .server. Creates remoter and rant (Reactant) for each open connection and adds rant to running doers. Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value opts is dict of injected optional additional parameters Usage: add result of doify on this method to doers list """ yield # enter context while True: for ca, ix in list(self.server.ixes.items()): if ix.cutoff: self.closeConnection(ca) continue if ca not in self.rants: # create Reactant and extend doers with it rant = Reactant(tymth=self.tymth, hab=self.hab, verifier=self.verifier, exchanger=self.exchanger, remoter=ix) self.rants[ca] = rant # add Reactant (rant) doer to running doers self.extend(doers=[rant]) # open and run rant as doer if ix.tymeout > 0.0 and ix.tymer.expired: self.closeConnection(ca) # also removes rant yield
[docs] def closeConnection(self, ca): """ Close and remove connection given by ca and remove associated rant at ca. """ if ca in self.server.ixes: # remoter still there self.server.ixes[ca].serviceSends() # send final bytes to socket self.server.removeIx(ca) if ca in self.rants: # remove rant (Reactant) if any self.remove([self.rants[ca]]) # close and remove rant from doers list del self.rants[ca]
[docs] class Reactant(doing.DoDoer): """ Reactant Subclass of DoDoer with doers list from do generator methods: .msgDo, .cueDo, and .escrowDo. Enables continuous scheduling of doers (do generator instances or functions) Implements Doist like functionality to allow nested scheduling of doers. Each DoDoer runs a list of doers like a Doist but using the tyme from its injected tymist as injected by its parent DoDoer or Doist. Scheduling hierarchy: Doist->DoDoer...->DoDoer->Doers Attributes: .hab is Habitat instance of local controller's context .kevery is Kevery instance .remoter is TCP Remoter instance for connection from remote TCP client. Inherited Attributes: .done is Boolean completion state: True means completed Otherwise incomplete. Incompletion maybe due to close or abort. .opts is dict of injected options for its generator .do .doers is list of Doers or Doer like generator functions Inherited Properties: .tyme is float relative cycle time of associated Tymist .tyme obtained via injected .tymth function wrapper closure. .tymth is function wrapper closure returned by Tymist .tymeth() method. When .tymth is called it returns associated Tymist .tyme. .tymth provides injected dependency on Tymist tyme base. .tock is float, desired time in seconds between runs or until next run, non negative, zero means run asap Properties: Inherited Methods: .wind injects ._tymth dependency from associated Tymist to get its .tyme .__call__ makes instance callable Appears as generator function that returns generator .do is generator method that returns generator .enter is enter context action method .recur is recur context action method or generator method .clean is clean context action method .exit is exit context method .close is close context method .abort is abort context method Overidden Methods: Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, hab, remoter, verifier=None, exchanger=None, doers=None, **kwa): """ Initialize instance. Inherited Parameters: tymist is Tymist instance tock is float seconds initial value of .tock doers is list of doers (do generator instancs or functions) Parameters: hby is Habitat instance of local controller's context verifier is Verifier instance of local controller's TEL context remoter is TCP Remoter instance doers is list of doers (do generator instances, functions or methods) """ self.hab = hab self.verifier = verifier self.exchanger = exchanger self.remoter = remoter # use remoter for both rx and tx doers = doers if doers is not None else [] doers.extend([doing.doify(self.msgDo), doing.doify(self.cueDo), doing.doify(self.escrowDo)]) # needs unique kevery with ims per remoter connnection rvy = routing.Revery(db=hab.db) self.kevery = eventing.Kevery(db=self.hab.db, lax=False, local=False, rvy=rvy) if self.verifier is not None: self.tevery = Tevery(reger=self.verifier.reger, db=self.hab.db, local=False, rvy=rvy) self.tevery.registerReplyRoutes(router=rvy.rtr) else: self.tevery = None self.kevery.registerReplyRoutes(router=rvy.rtr) self.parser = parsing.Parser(ims=self.remoter.rxbs, framed=True, kvy=self.kevery, tvy=self.tevery, exc=self.exchanger, rvy=rvy) super(Reactant, self).__init__(doers=doers, **kwa) if self.tymth: self.remoter.wind(self.tymth)
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(Reactant, self).wind(tymth) self.remoter.wind(tymth)
[docs] def msgDo(self, tymth=None, tock=0.0, **opts): """ Returns doifiable Doist compatibile generator method (doer dog) to process incoming message stream of .kevery Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value opts is dict of injected optional additional parameters Usage: add result of doify on this method to doers list """ yield # enter context if self.parser.ims: logger.info("Server %s: received:\n%s\n...\n", self.hab.name, self.parser.ims[:1024]) done = yield from self.parser.parsator() # process messages continuously return done # should nover get here except forced close
[docs] def cueDo(self, tymth=None, tock=0.0, **opts): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery.cues deque Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value opts is dict of injected optional additional parameters Usage: add result of doify on this method to doers list """ yield # enter context while True: for msg in self.hab.processCuesIter(self.kevery.cues): if isinstance(msg, list): msg = bytearray(itertools.chain(*msg)) self.sendMessage(msg, label="chit or receipt or replay") yield # throttle just do one cue at a time yield return False # should never get here except forced close
[docs] def escrowDo(self, tymth=None, tock=0.0, **opts): """ Returns doifiable Doist compatibile generator method (doer dog) to process .kevery escrows. Doist Injected Attributes: g.tock = tock # default tock attributes g.done = None # default done state g.opts Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock is injected initial tock value opts is dict of injected optional additional parameters Usage: add result of doify on this method to doers list """ yield # enter context while True: self.kevery.processEscrows() if self.tevery is not None: self.tevery.processEscrows() yield return False # should never get here except forced close
[docs] def sendMessage(self, msg, label=""): """ Sends message msg and loggers label if any """ self.remoter.tx(msg) # send to remote logger.info("Server %s: sent %s:\n%d\n\n", self.hab.name, label, len(msg))
[docs] def runController(doers, expire=0.0): """ Utiitity Function to create doist to run doers """ tock = 0.03125 doist = doing.Doist(limit=expire, tock=tock, real=True) doist.do(doers=doers)