# -*- encoding: utf-8 -*-
"""
keri.core.routing module
"""
import datetime
import logging
import re
from hio.help import decking, ogler
from ..db import fetchTsgs
from ..help import helping
from ..kering import ConfigurationError, UnverifiedReplyError, ValidationError
from .coring import Dater, Diger, Ilks
from .eventing import validateSigs
logger = ogler.getLogger()
[docs]
class Router:
"""Reply message router
Reply message router that accepts registration of route `r` handlers and dispatches
reply messages to the appropriate handler.
"""
defaultResourceFunc = "processReply"
[docs]
def __init__(self, routes=None):
"""Initialized instance with optiona list of existing routes
Parameters:
routes (list): preregistered routes for this router
"""
self.routes = routes if routes is not None else list()
[docs]
def addRoute(self, routeTemplate, resource, suffix=None):
"""Add a route between a route template and a resource
Parameters:
routeTemplate (str): a route template to use for the resource
resource (object): the resource instance to associate with the route template
suffix(str, optional): Optional responder name suffix for this route. If a suffix is provided,
Router will map reply routes to processReply{suffix}(). In this way, multiple closely-related routes
can be mapped to the same resource.
"""
fields, regex = compile_uri_template(routeTemplate)
self.routes.append(
Route(regex=regex, fields=fields, resource=resource, suffix=suffix)
)
[docs]
def dispatch(self, serder, diger, cigars, tsgs):
"""
Parameters:
serder:
diger:
cigars:
tsgs:
Returns:
"""
ked = serder.ked
# Dispatch based on route
r = ked["r"]
route, match = self._find(route=r)
if route is None:
raise ValidationError(f"No resource is registered to handle route {r}")
fname = self.defaultResourceFunc
if route.suffix is not None:
fname += route.suffix
kwargs = match.groupdict()
for name in route.fields:
if name not in kwargs:
raise ValidationError(f"parameter {name} not found in route {r}")
fn = getattr(route.resource, fname, self.processRouteNotFound)
fn(serder=serder, diger=diger, route=r, cigars=cigars, tsgs=tsgs, **kwargs)
def _find(self, route):
"""Linear seach thru added routes, returning the first one that matchs
Searches through the registered routes until a regex in one of the routes matches
the provided route and returns the Route object along with the re.Match object.
Parameters:
route (str): the route from the `r` of the reply message
Returns:
Route: the Route object with the resource that is registered to process this rpy message
re.Match: the regular expression match that contains the grouping of matched parameters.
"""
for r in self.routes:
if res := r.regex.search(route):
return r, res
return None, None
[docs]
def processRouteNotFound(
self, *, serder, diger, route, cigars=None, tsgs=None, **kwargs
):
"""Default handler for processing reply message with an unregistered route
Parameters:
serder (Serder): reply event message
diger (Diger): instance from said of reply serder
route (str): route ('r') of the event message
cigars (Optional(list)): list of non-transferable signature tuples
tsgs (Optional(list)): list of transferable signature tuples
**kwargs (dict) additional keyword args
"""
raise ConfigurationError(
f"Resource registered for route {route} in {Ilks.rpy}"
f"does not contain the correct processReply method"
)
[docs]
class Revery:
"""Reply message event processor"""
TimeoutRPE = 3600 # seconds to timeout reply message escrows
[docs]
def __init__(self, db, rtr=None, cues=None, lax=True, local=False):
"""
Parameters:
db:
cues:
lax:
local:
"""
self.db = db
self.rtr = rtr if rtr is not None else Router()
self.cues = cues if cues is not None else decking.Deck()
self.lax = True if lax else False # promiscuous mode
self.local = True if local else False # local vs nonlocal restrictions
@property
def prefixes(self):
"""
Returns .db.prefixes
"""
return self.db.prefixes
[docs]
def processReply(self, serder, cigars=None, tsgs=None, **kwa):
"""
Process one reply message with either attached nontrans signing couples
in cigars or attached trans indexed sig groups in tsgs. Process logic
is route dependent and dispatched by route.
Parameters:
serder (Serder): instance of reply message
cigars (list): of Cigar instances that contain nontrans signing couple
signature in .raw and public key in .verfer
tsgs (list): tuples (quadruples) of form
(prefixer, seqner, diger, [sigers]) where:
prefixer is pre of trans endorser
number is sequence number of trans endorser's est evt for keys for sigs
diger is digest of trans endorser's est evt for keys for sigs
[sigers] is list of indexed sigs from trans endorser's keys from est evt
BADA (Best Available Data Acceptance) model for each reply message.
Latest-Seen-Signed Pairwise comparison of new update reply compared to
old already accepted reply from same source for same route (same data).
Accept new reply (update) if new reply is later than old reply where:
1. Later means date-time-stamp of new is greater than old.
2. If non-trans signer, later also means the sn (sequence number)
of the last (if forked) Est evt that provides keys for
signature(s) of new is greater than or equal to the sn of the
last Est evt that provides keys for signature(s) of new.
If nontrans and last Est Evt is not yet accepted then escrow.
If nontrans and partially signed then escrow.
Escrow process logic is route dependent and is dispatched by route,
i.e. route is address of buffer with route specific handler of escrow.
"""
# verify said of reply via Serder (handles protocol/ilk-specific multi-SAID logic)
if not serder.verify():
raise ValidationError(f"Invalid said for reply msg={serder.ked}.")
diger = Diger(qb64=serder.said)
self.rtr.dispatch(serder=serder, diger=diger, cigars=cigars, tsgs=tsgs)
[docs]
def acceptReply(
self, serder, saider, route, aid, osaider=None, cigars=None, tsgs=None
):
"""Applies Best Available Data Acceptance policy to reply and signatures
Returns:
bool: True is successfully accepted. False otherwise
Parameters:
serder (Serder): instance of reply msg (SAD)
saider (Diger): instance from said in serder (SAD)
osaider (Diger): instance of diger for previous reply if any
route (str): reply route
aid (str): identifier prefix qb64 of authorizing attributable ID
cigars (list): of Cigar instances that contain nontrans signing couple
signature in .raw and public key in .verfer
tsgs (list): tuples (quadruples) of form
(prefixer, seqner, diger, [sigers]) where:
prefixer is pre of trans endorser
number is sequence number of trans endorser's est evt for keys for sigs
diger is digest of trans endorser's est evt for keys for sigs
[sigers] is list of indexed sigs from trans endorser's keys from est evt
BADA (Best Available Data Acceptance) model for each reply message.
Latest-Seen-Signed Pairwise comparison of new update reply compared to
old already accepted reply from same source for same route (same data).
Accept new reply (update) if new reply is later than old reply where:
1. If transferable, later is true when either:
- the sn (sequence number) of the last (if forked) Est evt that
provides keys for signature(s) of new is greater than the sn
of the last Est evt that provides keys for signature(s) of old.
- the sn of new equals the sn of old and the date-time-stamp of
new is greater than old.
2. If non-transferable, later is true if the date-time-stamp of
new is greater than old.
3. Otherwise, later is false.
If nontrans and last Est Evt is not yet accepted then escrow.
If nontrans and partially signed then escrow.
Escrow process logic is route dependent and is dispatched by route,
i.e. route is address of buffer with route specific handler of escrow.
"""
# BADA logic.
accepted = False # flag to raise UnverifiedReplyError not accepted
cigars = cigars if cigars is not None else []
tsgs = tsgs if tsgs is not None else []
# Is new later than old if old?
# get date-time raises error if empty or invalid format
dater = Dater(dts=serder.ked["dt"])
odater = None
if osaider:
odater = self.db.sdts.get(keys=osaider.qb64b)
for cigar in cigars: # process each couple to verify sig and write to db
if cigar.verfer.transferable: # ignore invalid transferable verfers
logger.info(
"Revery: skipped invalid transferable verfers on reply said = %s",
serder.said,
)
continue # skip invalid transferable
if not self.lax and cigar.verfer.qb64 in self.prefixes: # own cig
if not self.local: # own cig when not local so ignore
logger.info(
"Revery: skipped own attachment for AID %s"
" on non-local reply at route = %s",
aid,
serder.ked["r"],
)
logger.debug("Reply Body=\n%s\n", serder.pretty())
continue # skip own cig attachment on non-local reply msg
if aid != cigar.verfer.qb64: # cig not by aid
logger.info(
"Revery: skipped cig not from aid=%s on reply at route %s",
aid,
serder.ked["r"],
)
logger.debug("Reply Body=\n%s\n", serder.pretty())
continue # skip invalid cig's verfer is not aid
if odater: # get old compare datetimes to see if later
if dater.datetime <= odater.datetime:
logger.trace(
"Revery: skipped stale update from %s of reply at route= %s",
aid,
serder.ked["r"],
)
logger.trace("Reply Body=\n%s\n", serder.pretty())
continue # skip if not later
# raise ValidationError(f"Stale update of {route} from {aid} "
# f"via {Ilks.rpy}={serder.ked}.")
if not cigar.verfer.verify(cigar.raw, serder.raw): # cig not verify
logger.info(
"Revery: skipped non-verifying cig from %s on reply at route = %s",
cigar.verfer.qb64,
serder.ked["r"],
)
logger.debug("Reply Body=\n%s\n", serder.pretty())
continue # skip if cig not verify
# All constraints satisfied so update
self.updateReply(serder=serder, saider=saider, dater=dater, cigar=cigar)
self.removeReply(saider=osaider) # remove obsoleted reply artifacts
accepted = True
break # first valid cigar sufficient ignore any duplicates in cigars
for prefixer, snumber, sdiger, sigers in tsgs: # iterate over each tsg
if not self.lax and prefixer.qb64 in self.prefixes: # own sig
if not self.local: # own sig when not local so ignore
logger.debug(
"Revery: skipped own attachment on nonlocal reply said=%s",
serder.said,
)
logger.debug("event=\n%s\n", serder.pretty())
continue # skip own sig attachment on non-local reply msg
spre = prefixer.qb64
if aid != spre: # sig not by aid
logger.info(
"Revery: skipped signature not from aid=%s on reply said=%s",
aid,
serder.said,
)
logger.debug(f"event=\n{serder.pretty()}\n")
continue # skip invalid signature is not from aid
if osaider: # check if later logic sn > or sn == and dt >
if otsgs := fetchTsgs(db=self.db.ssgs, diger=osaider):
_, osqr, _, _ = otsgs[0] # zeroth should be authoritative
if snumber.sn < osqr.sn: # sn earlier
logger.info(
"Revery: skipped stale key state sig "
"from %s sn=%s<%s on reply said=%s",
aid,
snumber.sn,
osqr.sn,
serder.said,
)
logger.debug("event=\n%s\n", serder.pretty())
continue # skip if sn earlier
if snumber.sn == osqr.sn: # sn same so check datetime
if odater:
if dater.datetime <= odater.datetime:
logger.info(
"Revery: skipped stale key state sig datetime "
"from %s on reply said=%s",
aid,
serder.said,
)
logger.debug("event=\n%s\n", serder.pretty())
continue # skip if not later
# retrieve sdig of last event at sn of signer.
sdig = self.db.kels.getLast(keys=spre, on=snumber.sn)
if sdig is None:
# create cue here to request key state for sprefixer signer
# signer's est event not yet in signer's KEL
logger.info(
"Revery: escrowing without key state for signer on reply said=%s",
serder.said,
)
self.escrowReply(
serder=serder,
saider=saider,
dater=dater,
route=route,
prefixer=prefixer,
snumber=snumber,
sdiger=sdiger,
sigers=sigers,
)
self.cues.append(dict(kin="query", q=dict(pre=spre)))
continue
sdig = sdig.encode("utf-8")
# retrieve last event itself of signer given sdig
sserder = self.db.evts.get(keys=(spre, bytes(sdig)))
# assumes db ensures that sserder must not be none because sdig was in KE
if sserder.said != sdiger.qb64: # signer's dig not match est evt
raise ValidationError(
f"Bad trans indexed sig group at sn = "
f"{snumber.sn} for reply = {serder.ked}."
)
# verify sigs
if not (sverfers := sserder.verfers):
raise ValidationError(
f"Invalid reply from signer={spre}, no "
f"keys at signer's est. event sn={snumber.sn}."
)
# fetch any escrowed sigs, extract just the siger from each quad
# want sn in numerical order so use hex
quadkeys = (saider.qb64, prefixer.qb64, f"{snumber.sn:032x}", sdiger.qb64)
esigers = self.db.ssgs.get(keys=quadkeys)
sigers.extend(esigers)
sigers, valid = validateSigs(
serder=serder, sigers=sigers, verfers=sverfers, tholder=sserder.tholder
)
# no error so at least one verified siger
if valid: # meet threshold so save
# All constraints satisfied so update
self.updateReply(
serder=serder,
saider=saider,
dater=dater,
prefixer=prefixer,
seqner=snumber,
diger=sdiger,
sigers=sigers,
)
self.removeReply(saider=osaider) # remove obsoleted reply artifacts
# remove stale signatures .ssgs for this saider
# this ensures that zeroth tsg is authoritative
for prr, snr, dgr, _ in fetchTsgs(
db=self.db.ssgs, diger=saider, snh=snumber.snh
):
if (snr.sn < snumber.sn) or (
snr.sn == snumber.sn and dgr.qb64 != sdiger.qb64
):
self.db.ssgs.trim(
keys=(prr.qb64, f"{snr.sn:032h}", dgr.qb64, "")
)
accepted = True
else: # not meet threshold so escrow
self.escrowReply(
serder=serder,
saider=saider,
dater=dater,
route=route,
prefixer=prefixer,
snumber=snumber,
sdiger=sdiger,
sigers=sigers,
)
return accepted
[docs]
def updateReply(
self,
*,
serder,
saider,
dater,
cigar=None,
prefixer=None,
seqner=None,
diger=None,
sigers=None,
):
"""Update Reply SAD in database
Update Reply SAD in database given by by serder and associated databases
for attached cig couple or sig quadruple.
Overwrites val at key if already exists.
Parameters:
serder (Serder): instance of reply msg (SAD)
saider (Diger): instance from said in serder (SAD)
dater (Dater): instance from date-time in serder (SAD)
cigar (Cigar): instance that contain nontrans signing couple
signature in .raw and public key in .verfer
prefixer (Prefixer): is pre of trans endorser
seqner (Seqner): is sequence number of trans endorser's est evt for keys for sigs
diger (Diger): is digest of trans endorser's est evt for keys for sigs
sigers (list): of indexed sigs from trans endorser's key from est evt
"""
keys = (saider.qb64,)
self.db.sdts.put(keys=keys, val=dater) # first one idempotent
self.db.rpys.put(keys=keys, val=serder) # first one idempotent
if cigar:
self.db.scgs.put(keys=keys, vals=[(cigar.verfer, cigar)])
if sigers: # want sn in numerical order so use hex
quadkeys = (saider.qb64, prefixer.qb64, f"{seqner.sn:032x}", diger.qb64)
self.db.ssgs.put(keys=quadkeys, vals=sigers)
[docs]
def removeReply(self, saider):
"""Remove Reply SAD artifacts given by saider.
Parameters:
saider (Diger): instance from said in serder (SAD)
"""
if saider:
keys = (saider.qb64,)
self.db.ssgs.trim(keys=(saider.qb64, "")) # remove whole branch
self.db.scgs.rem(keys=keys)
self.db.rpys.rem(keys=keys)
self.db.sdts.rem(keys=keys)
[docs]
def escrowReply(
self, *, serder, saider, dater, route, prefixer, snumber, sdiger, sigers
):
"""Escrow reply by route
Parameters:
serder (Serder): instance of reply msg (SAD)
saider (Diger): instance from said in serder (SAD)
dater (Dater): instance from date-time in serder (SAD)
route (str): reply route
prefixer (Prefixer): is pre of trans endorser
snumber (Number): is sequence number of trans endorser's est evt for keys for sigs
sdiger (Diger) is said of trans endorser's est evt for keys for sigs
sigers (list): is indexed sigs from trans endorser's key from est evt
"""
if not sigers:
return # nothing to escrow
keys = (saider.qb64,)
self.db.sdts.put(keys=keys, val=dater) # first one idempotent
self.db.rpys.put(keys=keys, val=serder) # first one idempotent
quadkeys = (saider.qb64, prefixer.qb64, f"{snumber.sn:032x}", sdiger.qb64)
self.db.ssgs.put(keys=quadkeys, vals=sigers)
self.db.rpes.put(keys=(route,), vals=[saider])
[docs]
def processEscrowReply(self):
"""Process escrows for reply messages.
Escrows are keyed by reply route and val is reply said
triple (prefixer, seqner, diger)
quadruple (prefixer, seqner, diger, siger)
"""
for (route,), diger in self.db.rpes.getTopItemIter():
try:
tsgs = fetchTsgs(db=self.db.ssgs, diger=diger)
keys = (diger.qb64,)
dater = self.db.sdts.get(keys=keys)
serder = self.db.rpys.get(keys=keys)
try:
if not (dater and serder and tsgs):
raise ValueError(
f"Missing escrow artifacts at said={diger.qb64}"
f"for route={route}."
)
# do date math for stale escrow
if (helping.nowUTC() - dater.datetime) > datetime.timedelta(
seconds=self.TimeoutRPE
):
# escrow stale so raise ValidationError which unescrows below
logger.info(
"Revery unescrow error: Stale reply escrow at route = %s",
route,
)
raise ValidationError(f"Stale reply escrow at route = {route}.")
self.processReply(serder=serder, tsgs=tsgs)
except UnverifiedReplyError as ex:
# still waiting on missing prior event to validate
if logger.isEnabledFor(logging.TRACE):
logger.trace("Revery unescrow attempt failed: %s\n", ex.args[0])
except Exception as ex: # other error so remove from reply escrow
self.db.rpes.rem(keys=(route,), val=diger) # remove escrow only
self.removeReply(diger) # remove escrow reply artifacts
if logger.isEnabledFor(logging.DEBUG):
logger.exception(
"Revery unescrowed due to error: %s", ex.args[0]
)
else:
logger.error("Revery unescrowed due to error: %s", ex.args[0])
else: # unescrow succeded
self.db.rpes.rem(keys=(route,), val=diger) # remove escrow only
logger.info(
"Revery unescrow succeeded for reply said=%s", serder.said
)
logger.debug("event=\n%s\n", serder.pretty())
except Exception as ex: # log diagnostics errors etc
self.db.rpes.rem(keys=(route,), val=diger) # remove escrow only
self.removeReply(diger) # remove escrow reply artifacts
if logger.isEnabledFor(logging.DEBUG):
logger.exception("Revery unescrowed due to error: %s", ex.args[0])
else:
logger.error("Revery unescrowed due to error: %s", ex.args[0])
[docs]
class Route:
"""Route class for registration of reply message handlers
This class represents a registered route internally to the Revery.
the properties are created by using the Falcon compile route utility method
Properties:
.regex(re): compiled url template regex
.fields(set): field names for matches in regex
.resource(object): the handler for this route
.suffix(Optional(str)): a suffix to be applied to the handler method
"""
[docs]
def __init__(self, regex, fields, resource, suffix=None):
"""Initialize instance of route
Parameters:
regex(re): compiled url template regex
fields(set): field names for matches in regex
resource(object): the handler for this route
suffix(Optional(str)): a suffix to be applied to the handler method
"""
self.regex = regex
self.fields = fields
self.resource = resource
self.suffix = suffix
[docs]
def compile_uri_template(template):
"""Compile the given URI template string into a pattern matcher.
This function can be used to construct custom routing engines that
iterate through a list of possible routes, attempting to match
an incoming request against each route's compiled regular expression.
Each field is converted to a named group, so that when a match
is found, the fields can be easily extracted using
:py:meth:`re.MatchObject.groupdict`.
This function does not support the more flexible templating
syntax used in the default router. Only simple paths with bracketed
field expressions are recognized. For example::
/
/books
/books/{isbn}
/books/{isbn}/characters
/books/{isbn}/characters/{name}
Also, note that if the template contains a trailing slash character,
it will be stripped in order to normalize the routing logic.
Args:
template(str): The template to compile. Note that field names are
restricted to ASCII a-z, A-Z, and the underscore character.
Returns:
tuple: (template_field_names, template_regex)
"""
if not isinstance(template, str):
raise TypeError("uri_template is not a string")
if not template.startswith("/"):
raise ValueError("uri_template must start with '/'")
if "//" in template:
raise ValueError("uri_template may not contain '//'")
if template != "/" and template.endswith("/"):
template = template[:-1]
# template names should be able to start with A-Za-z
# but also contain 0-9_ in the remaining portion
expression_pattern = r"{([a-zA-Z]\w*)}"
# Get a list of field names
fields = set(re.findall(expression_pattern, template))
# Convert Level 1 var patterns to equivalent named regex groups
escaped = re.sub(r"[\.\(\)\[\]\?\*\+\^\|]", r"\\\g<0>", template)
pattern = re.sub(expression_pattern, r"(?P<\1>[^/]+)", escaped)
pattern = r"\A" + pattern + r"\Z"
return fields, re.compile(pattern, re.IGNORECASE)