Source code for keri.core.routing

# -*- encoding: utf-8 -*-
"""
keri.core.routing module

"""

import datetime
import logging
import re

from hio.help import decking, ogler

from ..db import fetchTsgs
from ..help import helping
from ..kering import ConfigurationError, UnverifiedReplyError, ValidationError
from .coring import Dater, Diger, Ilks
from .eventing import validateSigs

logger = ogler.getLogger()


[docs] class Router: """Reply message router Reply message router that accepts registration of route `r` handlers and dispatches reply messages to the appropriate handler. """ defaultResourceFunc = "processReply"
[docs] def __init__(self, routes=None): """Initialized instance with optiona list of existing routes Parameters: routes (list): preregistered routes for this router """ self.routes = routes if routes is not None else list()
[docs] def addRoute(self, routeTemplate, resource, suffix=None): """Add a route between a route template and a resource Parameters: routeTemplate (str): a route template to use for the resource resource (object): the resource instance to associate with the route template suffix(str, optional): Optional responder name suffix for this route. If a suffix is provided, Router will map reply routes to processReply{suffix}(). In this way, multiple closely-related routes can be mapped to the same resource. """ fields, regex = compile_uri_template(routeTemplate) self.routes.append( Route(regex=regex, fields=fields, resource=resource, suffix=suffix) )
[docs] def dispatch(self, serder, diger, cigars, tsgs): """ Parameters: serder: diger: cigars: tsgs: Returns: """ ked = serder.ked # Dispatch based on route r = ked["r"] route, match = self._find(route=r) if route is None: raise ValidationError(f"No resource is registered to handle route {r}") fname = self.defaultResourceFunc if route.suffix is not None: fname += route.suffix kwargs = match.groupdict() for name in route.fields: if name not in kwargs: raise ValidationError(f"parameter {name} not found in route {r}") fn = getattr(route.resource, fname, self.processRouteNotFound) fn(serder=serder, diger=diger, route=r, cigars=cigars, tsgs=tsgs, **kwargs)
def _find(self, route): """Linear seach thru added routes, returning the first one that matchs Searches through the registered routes until a regex in one of the routes matches the provided route and returns the Route object along with the re.Match object. Parameters: route (str): the route from the `r` of the reply message Returns: Route: the Route object with the resource that is registered to process this rpy message re.Match: the regular expression match that contains the grouping of matched parameters. """ for r in self.routes: if res := r.regex.search(route): return r, res return None, None
[docs] def processRouteNotFound( self, *, serder, diger, route, cigars=None, tsgs=None, **kwargs ): """Default handler for processing reply message with an unregistered route Parameters: serder (Serder): reply event message diger (Diger): instance from said of reply serder route (str): route ('r') of the event message cigars (Optional(list)): list of non-transferable signature tuples tsgs (Optional(list)): list of transferable signature tuples **kwargs (dict) additional keyword args """ raise ConfigurationError( f"Resource registered for route {route} in {Ilks.rpy}" f"does not contain the correct processReply method" )
[docs] class Revery: """Reply message event processor""" TimeoutRPE = 3600 # seconds to timeout reply message escrows
[docs] def __init__(self, db, rtr=None, cues=None, lax=True, local=False): """ Parameters: db: cues: lax: local: """ self.db = db self.rtr = rtr if rtr is not None else Router() self.cues = cues if cues is not None else decking.Deck() self.lax = True if lax else False # promiscuous mode self.local = True if local else False # local vs nonlocal restrictions
@property def prefixes(self): """ Returns .db.prefixes """ return self.db.prefixes
[docs] def processReply(self, serder, cigars=None, tsgs=None, **kwa): """ Process one reply message with either attached nontrans signing couples in cigars or attached trans indexed sig groups in tsgs. Process logic is route dependent and dispatched by route. Parameters: serder (Serder): instance of reply message cigars (list): of Cigar instances that contain nontrans signing couple signature in .raw and public key in .verfer tsgs (list): tuples (quadruples) of form (prefixer, seqner, diger, [sigers]) where: prefixer is pre of trans endorser number is sequence number of trans endorser's est evt for keys for sigs diger is digest of trans endorser's est evt for keys for sigs [sigers] is list of indexed sigs from trans endorser's keys from est evt BADA (Best Available Data Acceptance) model for each reply message. Latest-Seen-Signed Pairwise comparison of new update reply compared to old already accepted reply from same source for same route (same data). Accept new reply (update) if new reply is later than old reply where: 1. Later means date-time-stamp of new is greater than old. 2. If non-trans signer, later also means the sn (sequence number) of the last (if forked) Est evt that provides keys for signature(s) of new is greater than or equal to the sn of the last Est evt that provides keys for signature(s) of new. If nontrans and last Est Evt is not yet accepted then escrow. If nontrans and partially signed then escrow. Escrow process logic is route dependent and is dispatched by route, i.e. route is address of buffer with route specific handler of escrow. """ # verify said of reply via Serder (handles protocol/ilk-specific multi-SAID logic) if not serder.verify(): raise ValidationError(f"Invalid said for reply msg={serder.ked}.") diger = Diger(qb64=serder.said) self.rtr.dispatch(serder=serder, diger=diger, cigars=cigars, tsgs=tsgs)
[docs] def acceptReply( self, serder, saider, route, aid, osaider=None, cigars=None, tsgs=None ): """Applies Best Available Data Acceptance policy to reply and signatures Returns: bool: True is successfully accepted. False otherwise Parameters: serder (Serder): instance of reply msg (SAD) saider (Diger): instance from said in serder (SAD) osaider (Diger): instance of diger for previous reply if any route (str): reply route aid (str): identifier prefix qb64 of authorizing attributable ID cigars (list): of Cigar instances that contain nontrans signing couple signature in .raw and public key in .verfer tsgs (list): tuples (quadruples) of form (prefixer, seqner, diger, [sigers]) where: prefixer is pre of trans endorser number is sequence number of trans endorser's est evt for keys for sigs diger is digest of trans endorser's est evt for keys for sigs [sigers] is list of indexed sigs from trans endorser's keys from est evt BADA (Best Available Data Acceptance) model for each reply message. Latest-Seen-Signed Pairwise comparison of new update reply compared to old already accepted reply from same source for same route (same data). Accept new reply (update) if new reply is later than old reply where: 1. If transferable, later is true when either: - the sn (sequence number) of the last (if forked) Est evt that provides keys for signature(s) of new is greater than the sn of the last Est evt that provides keys for signature(s) of old. - the sn of new equals the sn of old and the date-time-stamp of new is greater than old. 2. If non-transferable, later is true if the date-time-stamp of new is greater than old. 3. Otherwise, later is false. If nontrans and last Est Evt is not yet accepted then escrow. If nontrans and partially signed then escrow. Escrow process logic is route dependent and is dispatched by route, i.e. route is address of buffer with route specific handler of escrow. """ # BADA logic. accepted = False # flag to raise UnverifiedReplyError not accepted cigars = cigars if cigars is not None else [] tsgs = tsgs if tsgs is not None else [] # Is new later than old if old? # get date-time raises error if empty or invalid format dater = Dater(dts=serder.ked["dt"]) odater = None if osaider: odater = self.db.sdts.get(keys=osaider.qb64b) for cigar in cigars: # process each couple to verify sig and write to db if cigar.verfer.transferable: # ignore invalid transferable verfers logger.info( "Revery: skipped invalid transferable verfers on reply said = %s", serder.said, ) continue # skip invalid transferable if not self.lax and cigar.verfer.qb64 in self.prefixes: # own cig if not self.local: # own cig when not local so ignore logger.info( "Revery: skipped own attachment for AID %s" " on non-local reply at route = %s", aid, serder.ked["r"], ) logger.debug("Reply Body=\n%s\n", serder.pretty()) continue # skip own cig attachment on non-local reply msg if aid != cigar.verfer.qb64: # cig not by aid logger.info( "Revery: skipped cig not from aid=%s on reply at route %s", aid, serder.ked["r"], ) logger.debug("Reply Body=\n%s\n", serder.pretty()) continue # skip invalid cig's verfer is not aid if odater: # get old compare datetimes to see if later if dater.datetime <= odater.datetime: logger.trace( "Revery: skipped stale update from %s of reply at route= %s", aid, serder.ked["r"], ) logger.trace("Reply Body=\n%s\n", serder.pretty()) continue # skip if not later # raise ValidationError(f"Stale update of {route} from {aid} " # f"via {Ilks.rpy}={serder.ked}.") if not cigar.verfer.verify(cigar.raw, serder.raw): # cig not verify logger.info( "Revery: skipped non-verifying cig from %s on reply at route = %s", cigar.verfer.qb64, serder.ked["r"], ) logger.debug("Reply Body=\n%s\n", serder.pretty()) continue # skip if cig not verify # All constraints satisfied so update self.updateReply(serder=serder, saider=saider, dater=dater, cigar=cigar) self.removeReply(saider=osaider) # remove obsoleted reply artifacts accepted = True break # first valid cigar sufficient ignore any duplicates in cigars for prefixer, snumber, sdiger, sigers in tsgs: # iterate over each tsg if not self.lax and prefixer.qb64 in self.prefixes: # own sig if not self.local: # own sig when not local so ignore logger.debug( "Revery: skipped own attachment on nonlocal reply said=%s", serder.said, ) logger.debug("event=\n%s\n", serder.pretty()) continue # skip own sig attachment on non-local reply msg spre = prefixer.qb64 if aid != spre: # sig not by aid logger.info( "Revery: skipped signature not from aid=%s on reply said=%s", aid, serder.said, ) logger.debug(f"event=\n{serder.pretty()}\n") continue # skip invalid signature is not from aid if osaider: # check if later logic sn > or sn == and dt > if otsgs := fetchTsgs(db=self.db.ssgs, diger=osaider): _, osqr, _, _ = otsgs[0] # zeroth should be authoritative if snumber.sn < osqr.sn: # sn earlier logger.info( "Revery: skipped stale key state sig " "from %s sn=%s<%s on reply said=%s", aid, snumber.sn, osqr.sn, serder.said, ) logger.debug("event=\n%s\n", serder.pretty()) continue # skip if sn earlier if snumber.sn == osqr.sn: # sn same so check datetime if odater: if dater.datetime <= odater.datetime: logger.info( "Revery: skipped stale key state sig datetime " "from %s on reply said=%s", aid, serder.said, ) logger.debug("event=\n%s\n", serder.pretty()) continue # skip if not later # retrieve sdig of last event at sn of signer. sdig = self.db.kels.getLast(keys=spre, on=snumber.sn) if sdig is None: # create cue here to request key state for sprefixer signer # signer's est event not yet in signer's KEL logger.info( "Revery: escrowing without key state for signer on reply said=%s", serder.said, ) self.escrowReply( serder=serder, saider=saider, dater=dater, route=route, prefixer=prefixer, snumber=snumber, sdiger=sdiger, sigers=sigers, ) self.cues.append(dict(kin="query", q=dict(pre=spre))) continue sdig = sdig.encode("utf-8") # retrieve last event itself of signer given sdig sserder = self.db.evts.get(keys=(spre, bytes(sdig))) # assumes db ensures that sserder must not be none because sdig was in KE if sserder.said != sdiger.qb64: # signer's dig not match est evt raise ValidationError( f"Bad trans indexed sig group at sn = " f"{snumber.sn} for reply = {serder.ked}." ) # verify sigs if not (sverfers := sserder.verfers): raise ValidationError( f"Invalid reply from signer={spre}, no " f"keys at signer's est. event sn={snumber.sn}." ) # fetch any escrowed sigs, extract just the siger from each quad # want sn in numerical order so use hex quadkeys = (saider.qb64, prefixer.qb64, f"{snumber.sn:032x}", sdiger.qb64) esigers = self.db.ssgs.get(keys=quadkeys) sigers.extend(esigers) sigers, valid = validateSigs( serder=serder, sigers=sigers, verfers=sverfers, tholder=sserder.tholder ) # no error so at least one verified siger if valid: # meet threshold so save # All constraints satisfied so update self.updateReply( serder=serder, saider=saider, dater=dater, prefixer=prefixer, seqner=snumber, diger=sdiger, sigers=sigers, ) self.removeReply(saider=osaider) # remove obsoleted reply artifacts # remove stale signatures .ssgs for this saider # this ensures that zeroth tsg is authoritative for prr, snr, dgr, _ in fetchTsgs( db=self.db.ssgs, diger=saider, snh=snumber.snh ): if (snr.sn < snumber.sn) or ( snr.sn == snumber.sn and dgr.qb64 != sdiger.qb64 ): self.db.ssgs.trim( keys=(prr.qb64, f"{snr.sn:032h}", dgr.qb64, "") ) accepted = True else: # not meet threshold so escrow self.escrowReply( serder=serder, saider=saider, dater=dater, route=route, prefixer=prefixer, snumber=snumber, sdiger=sdiger, sigers=sigers, ) return accepted
[docs] def updateReply( self, *, serder, saider, dater, cigar=None, prefixer=None, seqner=None, diger=None, sigers=None, ): """Update Reply SAD in database Update Reply SAD in database given by by serder and associated databases for attached cig couple or sig quadruple. Overwrites val at key if already exists. Parameters: serder (Serder): instance of reply msg (SAD) saider (Diger): instance from said in serder (SAD) dater (Dater): instance from date-time in serder (SAD) cigar (Cigar): instance that contain nontrans signing couple signature in .raw and public key in .verfer prefixer (Prefixer): is pre of trans endorser seqner (Seqner): is sequence number of trans endorser's est evt for keys for sigs diger (Diger): is digest of trans endorser's est evt for keys for sigs sigers (list): of indexed sigs from trans endorser's key from est evt """ keys = (saider.qb64,) self.db.sdts.put(keys=keys, val=dater) # first one idempotent self.db.rpys.put(keys=keys, val=serder) # first one idempotent if cigar: self.db.scgs.put(keys=keys, vals=[(cigar.verfer, cigar)]) if sigers: # want sn in numerical order so use hex quadkeys = (saider.qb64, prefixer.qb64, f"{seqner.sn:032x}", diger.qb64) self.db.ssgs.put(keys=quadkeys, vals=sigers)
[docs] def removeReply(self, saider): """Remove Reply SAD artifacts given by saider. Parameters: saider (Diger): instance from said in serder (SAD) """ if saider: keys = (saider.qb64,) self.db.ssgs.trim(keys=(saider.qb64, "")) # remove whole branch self.db.scgs.rem(keys=keys) self.db.rpys.rem(keys=keys) self.db.sdts.rem(keys=keys)
[docs] def escrowReply( self, *, serder, saider, dater, route, prefixer, snumber, sdiger, sigers ): """Escrow reply by route Parameters: serder (Serder): instance of reply msg (SAD) saider (Diger): instance from said in serder (SAD) dater (Dater): instance from date-time in serder (SAD) route (str): reply route prefixer (Prefixer): is pre of trans endorser snumber (Number): is sequence number of trans endorser's est evt for keys for sigs sdiger (Diger) is said of trans endorser's est evt for keys for sigs sigers (list): is indexed sigs from trans endorser's key from est evt """ if not sigers: return # nothing to escrow keys = (saider.qb64,) self.db.sdts.put(keys=keys, val=dater) # first one idempotent self.db.rpys.put(keys=keys, val=serder) # first one idempotent quadkeys = (saider.qb64, prefixer.qb64, f"{snumber.sn:032x}", sdiger.qb64) self.db.ssgs.put(keys=quadkeys, vals=sigers) self.db.rpes.put(keys=(route,), vals=[saider])
[docs] def processEscrowReply(self): """Process escrows for reply messages. Escrows are keyed by reply route and val is reply said triple (prefixer, seqner, diger) quadruple (prefixer, seqner, diger, siger) """ for (route,), diger in self.db.rpes.getTopItemIter(): try: tsgs = fetchTsgs(db=self.db.ssgs, diger=diger) keys = (diger.qb64,) dater = self.db.sdts.get(keys=keys) serder = self.db.rpys.get(keys=keys) try: if not (dater and serder and tsgs): raise ValueError( f"Missing escrow artifacts at said={diger.qb64}" f"for route={route}." ) # do date math for stale escrow if (helping.nowUTC() - dater.datetime) > datetime.timedelta( seconds=self.TimeoutRPE ): # escrow stale so raise ValidationError which unescrows below logger.info( "Revery unescrow error: Stale reply escrow at route = %s", route, ) raise ValidationError(f"Stale reply escrow at route = {route}.") self.processReply(serder=serder, tsgs=tsgs) except UnverifiedReplyError as ex: # still waiting on missing prior event to validate if logger.isEnabledFor(logging.TRACE): logger.trace("Revery unescrow attempt failed: %s\n", ex.args[0]) except Exception as ex: # other error so remove from reply escrow self.db.rpes.rem(keys=(route,), val=diger) # remove escrow only self.removeReply(diger) # remove escrow reply artifacts if logger.isEnabledFor(logging.DEBUG): logger.exception( "Revery unescrowed due to error: %s", ex.args[0] ) else: logger.error("Revery unescrowed due to error: %s", ex.args[0]) else: # unescrow succeded self.db.rpes.rem(keys=(route,), val=diger) # remove escrow only logger.info( "Revery unescrow succeeded for reply said=%s", serder.said ) logger.debug("event=\n%s\n", serder.pretty()) except Exception as ex: # log diagnostics errors etc self.db.rpes.rem(keys=(route,), val=diger) # remove escrow only self.removeReply(diger) # remove escrow reply artifacts if logger.isEnabledFor(logging.DEBUG): logger.exception("Revery unescrowed due to error: %s", ex.args[0]) else: logger.error("Revery unescrowed due to error: %s", ex.args[0])
[docs] class Route: """Route class for registration of reply message handlers This class represents a registered route internally to the Revery. the properties are created by using the Falcon compile route utility method Properties: .regex(re): compiled url template regex .fields(set): field names for matches in regex .resource(object): the handler for this route .suffix(Optional(str)): a suffix to be applied to the handler method """
[docs] def __init__(self, regex, fields, resource, suffix=None): """Initialize instance of route Parameters: regex(re): compiled url template regex fields(set): field names for matches in regex resource(object): the handler for this route suffix(Optional(str)): a suffix to be applied to the handler method """ self.regex = regex self.fields = fields self.resource = resource self.suffix = suffix
[docs] def compile_uri_template(template): """Compile the given URI template string into a pattern matcher. This function can be used to construct custom routing engines that iterate through a list of possible routes, attempting to match an incoming request against each route's compiled regular expression. Each field is converted to a named group, so that when a match is found, the fields can be easily extracted using :py:meth:`re.MatchObject.groupdict`. This function does not support the more flexible templating syntax used in the default router. Only simple paths with bracketed field expressions are recognized. For example:: / /books /books/{isbn} /books/{isbn}/characters /books/{isbn}/characters/{name} Also, note that if the template contains a trailing slash character, it will be stripped in order to normalize the routing logic. Args: template(str): The template to compile. Note that field names are restricted to ASCII a-z, A-Z, and the underscore character. Returns: tuple: (template_field_names, template_regex) """ if not isinstance(template, str): raise TypeError("uri_template is not a string") if not template.startswith("/"): raise ValueError("uri_template must start with '/'") if "//" in template: raise ValueError("uri_template may not contain '//'") if template != "/" and template.endswith("/"): template = template[:-1] # template names should be able to start with A-Za-z # but also contain 0-9_ in the remaining portion expression_pattern = r"{([a-zA-Z]\w*)}" # Get a list of field names fields = set(re.findall(expression_pattern, template)) # Convert Level 1 var patterns to equivalent named regex groups escaped = re.sub(r"[\.\(\)\[\]\?\*\+\^\|]", r"\\\g<0>", template) pattern = re.sub(expression_pattern, r"(?P<\1>[^/]+)", escaped) pattern = r"\A" + pattern + r"\Z" return fields, re.compile(pattern, re.IGNORECASE)