# -*- encoding: utf-8 -*-
"""
keri.app.signaling module
"""
import datetime
import time
import falcon
from hio.base import doing
from hio.help import decking
from ..core import Dicter
from ..help import fromIso8601, nowIso8601, nowUTC
[docs]
def signal(attrs, topic, ckey=None, dt=None):
"""
Parameters:
attrs (dict): payload of the notice
topic (str): routing for recipient of message
dt(Optional(str, datetime)): iso8601 formatted datetime of notice
ckey (str): collapse key
Returns:
Notice: Notice instance
"""
dt = dt if dt is not None else nowIso8601()
if hasattr(dt, "isoformat"):
dt = dt.isoformat()
pad = dict(i="",
dt=dt,
r=topic,
a=attrs
)
return Signal(pad=pad, ckey=ckey)
[docs]
class Signal(Dicter):
[docs]
def __init__(self, pad, ckey=None):
""" New Signal
Signals with a collapse key will replace any existing signal not yet read with a matching value
as collapse key
Parameters:
pad (dict): Attribute values that make up the payload of the signal
ckey (str): The collapse key to use for
"""
super(Signal, self).__init__(pad=pad)
self._ckey = ckey
if 'dt' not in self.pad:
self.pad['dt'] = nowIso8601()
@property
def topic(self):
if 'r' in self.pad:
return self.pad['r']
else:
return None
@property
def ckey(self):
return self._ckey
@property
def dt(self):
return self.pad['dt']
@dt.setter
def dt(self, dt):
if hasattr(dt, "isoformat"):
dt = dt.isoformat()
self.pad['dt'] = dt
@property
def attrs(self):
if 'a' in self.pad:
return self.pad['a']
return None
[docs]
class Signaler(doing.DoDoer):
""" Class for sending signals to the controller of an agent.
The signals are just pings to reload data and not persistent messages that can be reread
"""
SignalTimeout = datetime.timedelta(minutes=10)
[docs]
def __init__(self, signals=None):
"""
Parameters:
"""
self.signals = signals if signals is not None else decking.Deck()
doers = [doing.doify(self.expireDo)]
super(Signaler, self).__init__(doers=doers)
[docs]
def push(self, attrs, topic, ckey=None, dt=None):
"""
Parameters:
attrs (dict): signal attributes to push to the cue
topic (str): routing for recipient of message
ckey (str): collapse key
dt(Optional(str, datetime)): iso8601 formatted datetime of notice
Returns:
"""
dt = dt if dt is not None else nowIso8601()
sig = signal(attrs=attrs, topic=topic, ckey=ckey, dt=dt)
if sig.ckey is not None:
for i, s in enumerate(self.signals):
if s.ckey == sig.ckey:
self.signals[i] = sig
return
self.signals.append(sig)
[docs]
def expireDo(self, tymth=None, tock=0.0, **kwa):
"""
Returns doifiable Doist compatible generator method (doer dog)
Usage:
add result of doify on this method to doers list
Parameters:
tymth is injected function wrapper closure returned by .tymen() of
Tymist instance. Calling tymth() returns associated Tymist .tyme.
tock is injected initial tock value
"""
self.wind(tymth)
self.tock = tock
_ = (yield self.tock)
while True: # loop checking for expired messages
now = nowUTC()
toRemove = []
for sig in self.signals:
if now - fromIso8601(sig.dt) > self.SignalTimeout: # Expire messages that are too old
toRemove.append(sig)
yield self.tock
for sig in toRemove:
self.signals.remove(sig)
yield self.tock
[docs]
def loadEnds(app, *, signals=None):
""" Load endpoints for agent to controller messages
Args:
app (falcon.App): falcon.App to register handlers with:
signals (Deck): messages for the mailbox stream
Returns:
"""
sigEnd = SignalsEnd(signals=signals)
app.add_route("/mbx", sigEnd)
return sigEnd
[docs]
class SignalsEnd:
"""
HTTP handler that accepts and KERI events POSTed as the body of a request with all attachments to
the message as a CESR attachment HTTP header. KEL Messages are processed and added to the database
of the provided Habitat.
This also handles `req`, `exn` and `tel` messages that respond with a KEL replay.
"""
[docs]
def __init__(self, signals=None):
"""
Create the MBX HTTP server from the Habitat with an optional Falcon App to
register the routes with.
Parameters
rxbs (bytearray): output queue of bytes for message processing
mbx (Mailboxer): Mailbox storage
qrycues (Deck): inbound qry response queues
"""
self.signals = signals if signals is not None else decking.Deck()
[docs]
def on_post(self, req, rep):
"""
Handles POST for KERI mailbox service.
Parameters:
req (Request) Falcon HTTP request
rep (Response) Falcon HTTP response
.. code-block:: none
---
summary: Stream Server-Sent Events for KERI mailbox for identifier
description: Stream Server-Sent Events for KERI mailbox for identifier
tags:
- Mailbox
responses:
200:
content:
text/event-stream:
schema:
type: object
description: Signal query response for server sent events
204:
description: KEL or EXN event accepted.
"""
rep.set_header('Cache-Control', "no-cache")
rep.set_header('connection', "close")
rep.set_header('Content-Type', "text/event-stream")
rep.status = falcon.HTTP_200
rep.stream = SignalIterable(signals=self.signals)
[docs]
def on_get(self, req, rep):
"""
Handles GET requests as a stream of SSE events
Parameters:
req (Request) Falcon HTTP request
rep (Response) Falcon HTTP response
.. code-block:: none
---
summary: Stream Server-Sent Events for KERI mailbox for identifier
description: Stream Server-Sent Events for KERI mailbox for identifier
tags:
- Mailbox
responses:
200:
content:
text/event-stream:
schema:
type: object
description: Mailbox query response for server sent events
204:
description: KEL or EXN event accepted.
"""
rep.set_header('Cache-Control', "no-cache")
rep.set_header('connection', "close")
rep.set_header('Content-Type', "text/event-stream")
rep.stream = SignalIterable(signals=self.signals)
class SignalIterable:
TimeoutMBX = 300
def __init__(self, signals, retry=5000):
self.signals = signals
self.retry = retry
def __iter__(self):
self.start = self.end = time.perf_counter()
return self
def __next__(self):
if self.end - self.start < self.TimeoutMBX:
if self.start == self.end:
self.end = time.perf_counter()
return bytes(f"retry: {self.retry}\n\n".encode("utf-8"))
data = bytearray()
while self.signals:
sig = self.signals.popleft()
topic = sig.topic
if topic is not None:
data.extend(bytearray("id: {}\nretry: {}\nevent: {}\ndata: ".format(sig.rid, self.retry,
topic).encode("utf-8")))
else:
data.extend(bytearray("id: {}\nretry: {}\ndata: ".format(sig.id, self.retry).encode(
"utf-8")))
data.extend(sig.raw)
data.extend(b'\n\n')
self.end = time.perf_counter()
return bytes(data)
raise StopIteration