Source code for keri.app.oobiing

# -*- encoding: utf-8 -*-
"""
keri.kli.common.oobiing module

"""
import datetime
import json
import logging
from collections import namedtuple
from urllib import parse
from urllib.parse import urlparse

import falcon
from hio.base import doing
from hio.help import decking, ogler

from .httping import Clienter,CESR_CONTENT_TYPE
from .organizing import Organizer
from .. import (Vrsn_1_0, Vrsn_2_0, Version, Roles, Schemes, Ilks,
                ValidationError, UnverifiedReplyError,
                ConfigurationError)
from ..help import nowIso8601, fromIso8601, toIso8601, nowUTC
from ..core import (Prefixer, Router, Revery, Kevery,
                    Parser, Schemer, SerderKERI, exchange)
from ..end import OOBI_RE, DOOBI_RE, WOOBI_RE, OOBI_AID_HEADER
from ..recording import OobiRecord, WellKnownAuthN

logger = ogler.getLogger()

Resultage = namedtuple("Resultage", 'resolved failed')  # stream cold start status
Result = Resultage(resolved='resolved', failed='failed')


def loadEnds(app, *, hby, prefix=""):
    oobiEnd = OobiResource(hby=hby)
    app.add_route(prefix + "/oobi", oobiEnd)
    return []


[docs] def loadHandlers(hby, exc, notifier): """ Load handlers for the peer-to-peer delegation protocols Parameters: hby (Habery): Database and keystore for environment exc (Exchanger): Peer-to-peer message router notifier (Notifier): Outbound notifications """ oobireq = OobiRequestHandler(hby=hby, notifier=notifier) exc.addHandler(oobireq)
[docs] class OobiResource: """ Resource for managing OOBIs """
[docs] def __init__(self, hby): """ Create Endpoints for discovery and resolution of OOBIs Parameters: hby (Habery): identifier database environment """ self.hby = hby
[docs] def on_get_alias(self, req, rep, alias=None): """ OOBI GET endpoint Parameters: req: falcon.Request HTTP request rep: falcon.Response HTTP response alias: option route parameter for specific identifier to get .. code-block:: none --- summary: Get OOBI for specific identifier description: Generate OOBI for the identifier of the specified alias and role tags: - OOBIs parameters: - in: path name: alias schema: type: string required: true description: human readable alias for the identifier generate OOBI for - in: query name: role schema: type: string required: true description: role for which to generate OOBI responses: 200: description: An array of Identifier key state information content: application/json: schema: description: Key state information for current identifiers type: object """ hab = self.hby.habByName(alias) if hab is None: rep.status = falcon.HTTP_400 rep.text = "Invalid alias to generate OOBI" return role = req.params["role"] res = dict(role=role) if role in (Roles.witness,): # Fetch URL OOBIs for all witnesses oobis = [] for wit in hab.kever.wits: urls = hab.fetchUrls(eid=wit, scheme=Schemes.http) \ or hab.fetchUrls(eid=wit, scheme=Schemes.https) if not urls: rep.status = falcon.HTTP_404 rep.text = f"unable to query witness {wit}, no http endpoint" return url = urls[Schemes.https] if Schemes.https in urls else urls[Schemes.http] oobis.append(f"{url.rstrip("/")}/oobi/{hab.pre}/witness/{wit}") res["oobis"] = oobis elif role in (Roles.controller,): # Fetch any controller URL OOBIs oobis = [] urls = hab.fetchUrls(eid=hab.pre, scheme=Schemes.https) or hab.fetchUrls(eid=hab.pre, scheme=Schemes.http) if not urls: rep.status = falcon.HTTP_404 rep.text = f"unable to query controller {hab.pre}, no http endpoint" return url = urls[Schemes.https] if Schemes.https in urls else urls[Schemes.http] oobis.append(f"{url.rstrip("/")}/oobi/{hab.pre}/controller") res["oobis"] = oobis else: rep.status = falcon.HTTP_404 return rep.status = falcon.HTTP_200 rep.content_type = "application/json" rep.data = json.dumps(res).encode("utf-8")
[docs] def on_post(self, req, rep): """ Resolve OOBI endpoint. Parameters: req: falcon.Request HTTP request rep: falcon.Response HTTP response .. code-block:: none --- summary: Resolve OOBI and assign an alias for the remote identifier description: Resolve OOBI URL or rpy message by process results of request and assign alias in contact data for resolved identifier tags: - OOBIs requestBody: required: true content: application/json: schema: description: OOBI properties: oobialias: type: string description: alias to assign to the identifier resolved from this OOBI required: false url: type: string description: URL OOBI rpy: type: object description: unsigned KERI rpy event message with endpoints responses: 202: description: OOBI resolution to key state successful """ body = req.get_media() if "url" in body: oobi = body["url"] obr = OobiRecord(date=nowIso8601()) if "oobialias" in body: obr.oobialias = body["oobialias"] self.hby.db.oobis.pin(keys=(oobi,), val=obr) elif "rpy" in body: rep.status = falcon.HTTP_501 rep.text = "'rpy' support not implemented yet'" return else: rep.status = falcon.HTTP_400 rep.text = "invalid OOBI request body, either 'rpy' or 'url' is required" return rep.status = falcon.HTTP_202
[docs] class OobiRequestHandler: """ Handler for oobi notification EXN messages """ resource = "/oobis"
[docs] def __init__(self, hby, notifier): """ Parameters: hby (Habery) database environment of the controller notifier (Notifier) notifier to convert OOBI request exn messages to controller notifications """ self.hby = hby self.notifier = notifier
[docs] def handle(self, serder, attachments=None): """ Do route specific processsing of OOBI request messages Parameters: serder (Serder): Serder of the exn OOBI request message attachments (list): list of tuples of pather, CESR SAD path attachments to the exn event """ src = serder.pre pay = serder.ked['a'] if "oobi" not in pay: print(f"invalid oobi message, missing oobi. evt={serder.ked}") return oobi = pay["oobi"] obr = OobiRecord(date=nowIso8601()) self.hby.db.oobis.pin(keys=(oobi,), val=obr) data = dict( r="/oobi", src=src, oobi=oobi ) purl = parse.urlparse(oobi) params = parse.parse_qs(purl.query) if "name" in params: data["oobialias"] = params["name"][0] self.notifier.add(attrs=data)
[docs] def oobiRequestExn(hab, dest, oobi, version=Version, pvrsn=None, gvrsn=Version, framed=True, nested=False, genusify=False): """Create oobi request exn and attachments Parameters:: dest oobi version (Versionage): KERI protocol default version if psvrsn is None pvrsn (Versionage): KERI protocol version gvrsn (Versionage): CESR Genus version for attachment group codes or nesting group code (useful when serder.gvrsn < 2) gvrsn = max(svrsn, gvrsn) where svrsn = serder.gvrsn if serder.gvrsn else serder.pvrsn framed (bool): True means may assume each message plus its attachments is isolated as frame when parsing so do not need attachment group when messagizing False means may not assume eash message plus its attachments is isolated as frame when parsing so do need attachment group when messagizing nested (bool): True means messagize for non-top level This forces non-native serializion to be embedded in non-native group code False means messagize for top level of stream. This allows bare non-native serialization of message genusify (bool): True means prepend genus version code from gvrsn before serder to override default stream genus version False means do nothing """ data = dict( dest=dest, oobi=oobi ) # Create `exn` peer to peer message to notify other participants UI exn = exchange(sender=hab.pre, route=OobiRequestHandler.resource, modifiers=dict(), attributes=data, version=version, pvrsn=pvrsn, gvrsn=gvrsn) ims = hab.endorse(serder=exn, last=False, gvrsn=gvrsn, framed=framed, nested=nested, genusify=genusify) del ims[:exn.size] return exn, ims
[docs] class Oobiery: """ Resolver for OOBIs """ RetryDelay = 30
[docs] def __init__(self, hby, rvy=None, clienter=None, cues=None): """ DoDoer to handle the request and parsing of OOBIs Parameters: hby (Habery): database environment clienter (Clienter): DoDoer client provider responsible for managing HTTP client requests cues (decking.Deck): outbound cues from processing oobis """ self.hby = hby self.rvy = rvy if self.rvy is not None: self.registerReplyRoutes(self.rvy.rtr) self.clienter = clienter or Clienter() self.org = Organizer(hby=self.hby) # Set up a local parser for returned events from OOBI queries. rtr = Router() rvy = Revery(db=self.hby.db, rtr=rtr) kvy = Kevery(db=self.hby.db, lax=True, local=False, rvy=rvy) kvy.registerReplyRoutes(router=rtr) self.parser = Parser(framed=True, kvy=kvy, rvy=rvy, version=Vrsn_1_0) self.cues = cues if cues is not None else decking.Deck() self.clients = dict() self.doers = [self.clienter, doing.doify(self.scoobiDo)]
[docs] def registerReplyRoutes(self, router): """ Register the routes for processing messages embedded in `rpy` event messages The Oobiery handles rpy messages with the /introduce route by processing the contained oobi Parameters: router(Router): reply message router """ router.addRoute("/introduce", self)
[docs] def processReply(self, *, serder, diger, route, cigars=None, tsgs=None, **kwargs): """ Process one reply message for route = /introduce with either attached nontrans receipt couples in cigars or attached trans indexed sig groups in tsgs. Assumes already validated diger, dater, and route from serder.ked Parameters: serder (SerderKERI): instance of reply msg (SAD) diger (Diger): instance from said in serder (SAD) route (str): reply route cigars (list): of Cigar instances that contain nontrans signing couple signature in .raw and public key in .verfer tsgs (list): tuples (quadruples) of form (prefixer, seqner, diger, [sigers]) where: prefixer is pre of trans endorser seqner is sequence number of trans endorser's est evt for keys for sigs diger is digest of trans endorser's est evt for keys for sigs [sigers] is list of indexed sigs from trans endorser's keys from est evt OobiRecord: date: str = date time of reply message of the introduction Reply Message:: { "v" : "KERI10JSON00011c_", "t" : "rpy", "d": "EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM", "dt": "2020-08-22T17:50:12.988921+00:00", "r" : "/introduce", "a" : { "cid": "ENcOes8_t2C7tck4X4j61fSm0sWkLbZrEZffq7mSn8On", "oobi": "http://localhost:5632/oobi/ENcOes8_t2C7tck4X4j61fSm0sWkLbZrEZffq7mSn8On/witness", } } """ if route != "/introduce": raise ValidationError(f"Usupported route={route} in {Ilks.rpy} " f"msg={serder.ked}.") data = serder.ked['a'] dt = serder.ked["dt"] for k in ("cid", "oobi"): if k not in data: raise ValidationError(f"Missing element={k} from attributes in" f" {Ilks.rpy} msg={serder.ked}.") cider = Prefixer(qb64=data["cid"]) # raises error if unsupported code cid = cider.qb64 # controller authorizing eid at role aid = cid # authorizing attribution id oobi = data["oobi"] url = urlparse(oobi) if url.scheme not in ("http", "https"): raise ValidationError(f"Invalid url scheme for introduced OOBI scheme={url.scheme}") if self.rvy is None: raise ConfigurationError("this oobiery is not configured to handle rpy introductions") # Process BADA RUN but with no previous reply message, always process introductions accepted = self.rvy.acceptReply(serder=serder, saider=diger, route=route, aid=aid, osaider=None, cigars=cigars, tsgs=tsgs) if not accepted: raise UnverifiedReplyError(f"Unverified introduction reply. {serder.ked}") obr = OobiRecord(cid=cid, date=dt) self.hby.db.oobis.put(keys=(oobi,), val=obr)
[docs] def scoobiDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process .exc responses and pass them on to the HTTPRespondant Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value Usage: add result of doify on this method to doers list """ _ = (yield tock) while True: self.processFlows() yield tock
[docs] def processFlows(self): """ Process OOBI URLs by requesting from the endpoint and parsing the results """ self.processOobis() self.processClients() self.processRetries() self.processMOOBIs()
[docs] def processOobis(self): """ Process OOBI records loaded for discovery There should be only one OOBIERY that minds the OOBI table, this should read from the table like an escrow """ for (url,), obr in self.hby.db.oobis.getTopItemIter(): try: # Don't process OOBIs we've already resolved or are in escrow being retried if ((fnd := self.hby.db.roobi.get(keys=(url,))) is not None and fnd.state == Result.resolved) and \ self.hby.db.eoobi.get(keys=(url,)) is not None: logging.info(f"OOBI {url} already resolved, skipping") self.hby.db.oobis.rem(keys=(url,)) continue purl = parse.urlparse(url) if purl.path == "/oobi": # Self and Blinded Introductions params = parse.parse_qs(purl.query) # If name is hinted in query string, use it as alias if not provided in OOBIRecord if "name" in params and obr.oobialias is None: obr.oobialias = params["name"][0] self.request(url, obr) elif (match := OOBI_RE.match(purl.path)) is not None: # Full CID and optional EID obr.cid = match.group("cid") obr.eid = match.group("eid") obr.role = match.group("role") params = parse.parse_qs(purl.query) # If name is hinted in query string, use it as alias if not provided in OOBIRecord if "name" in params and obr.oobialias is None: obr.oobialias = params["name"][0] self.request(url, obr) elif (match := DOOBI_RE.match(purl.path)) is not None: # Full CID and optional EID obr.said = match.group("said") self.request(url, obr) elif (match := WOOBI_RE.match(purl.path)) is not None: # Well Known obr.cid = match.group("cid") params = parse.parse_qs(purl.query) # If name is hinted in query string, use it as alias if not provided in OOBIRecord if "name" in params and obr.oobialias is None: obr.oobialias = params["name"][0] self.request(url, obr) except ValueError as ex: print(f"error requesting invalid OOBI URL {ex}", url)
[docs] def processClients(self): """ Process Client responses by parsing the messages and removing the client/doer """ for (url,), obr in self.hby.db.coobi.getTopItemIter(): if url not in self.clients: self.request(url, obr) continue client = self.clients[url] if client.responses: response = client.responses.popleft() self.clienter.remove(client) if response["status"] == 404: print(f"{url} not found") self.hby.db.coobi.rem(keys=(url,)) self.hby.db.eoobi.pin(keys=(url,), val=obr) continue elif not response["status"] == 200: print("invalid status for oobi response: {}".format(response["status"])) self.hby.db.coobi.rem(keys=(url,)) obr.state = Result.failed self.hby.db.roobi.put(keys=(url,), val=obr) elif response["headers"]["Content-Type"] in ( CESR_CONTENT_TYPE, "application/json+cesr", "application/cesr+json", ): # CESR Stream response to OOBI (canonical + legacy variants) self.parser.parse(ims=bytearray(response["body"])) if OOBI_AID_HEADER in response["headers"]: obr.cid = response["headers"][OOBI_AID_HEADER] if obr.oobialias is not None and obr.cid: self.org.update(pre=obr.cid, data=dict(alias=obr.oobialias, oobi=url)) self.hby.db.coobi.rem(keys=(url,)) obr.state = Result.resolved self.hby.db.roobi.put(keys=(url,), val=obr) elif response["headers"]["Content-Type"] == "application/schema+json": # Schema response to data OOBI try: schemer = Schemer(raw=bytearray(response["body"])) if schemer.said == obr.said: self.hby.db.schema.pin(keys=(schemer.said,), val=schemer) result = Result.resolved else: result = Result.failed except (ValidationError, ValueError): result = Result.failed obr.state = result self.hby.db.coobi.rem(keys=(url,)) self.hby.db.roobi.put(keys=(url,), val=obr) elif response["headers"]["Content-Type"].startswith("application/json"): # Unsigned rpy OOBI or Schema try: schemer = Schemer(raw=bytearray(response["body"])) if schemer.said == obr.said: self.hby.db.schema.pin(keys=(schemer.said,), val=schemer) result = Result.resolved else: result = Result.failed obr.state = result self.hby.db.coobi.rem(keys=(url,)) self.hby.db.roobi.put(keys=(url,), val=obr) continue except (ValidationError, ValueError): pass try: serder = SerderKERI(raw=bytearray(response["body"])) except ValueError: obr.state = Result.failed self.hby.db.coobi.rem(keys=(url,)) self.hby.db.roobi.put(keys=(url,), val=obr) continue if not serder.ked['t'] == Ilks.rpy: obr.state = Result.failed self.hby.db.coobi.rem(keys=(url,)) self.hby.db.roobi.put(keys=(url,), val=obr) elif serder.ked['r'] in ('/oobi/witness', '/oobi/controller'): self.processMultiOobiRpy(url, serder, obr) else: obr.state = Result.failed self.hby.db.coobi.rem(keys=(url,)) self.hby.db.roobi.put(keys=(url,), val=obr) else: self.hby.db.coobi.rem(keys=(url,)) obr.state = Result.failed self.hby.db.roobi.put(keys=(url,), val=obr) logger.error("invalid content type for oobi response: {}" .format(response["headers"]["Content-Type"])) self.cues.append(dict(kin=obr.state, oobi=url))
[docs] def processMOOBIs(self): """ Process Client responses by parsing the messages and removing the client/doer """ for (url,), obr in self.hby.db.moobi.getTopItemIter(): result = Result.resolved complete = True for oobi in obr.urls: robr = self.hby.db.roobi.get(keys=(oobi,)) if not robr: complete = False break if robr.state == Result.failed: result = Result.failed if complete: obr.state = result self.hby.db.coobi.rem(keys=(url,)) self.hby.db.roobi.put(keys=(url,), val=obr)
[docs] def processRetries(self): """ Process Client responses by parsing the messages and removing the client/doer """ for (url,), obr in self.hby.db.eoobi.getTopItemIter(): last = fromIso8601(obr.date) now = nowUTC() if (now - last) > datetime.timedelta(seconds=self.RetryDelay): obr.date = toIso8601(now) self.hby.db.eoobi.rem(keys=(url,)) self.hby.db.oobis.pin(keys=(url,), val=obr)
def request(self, url, obr): client = self.clienter.request("GET", url=url) if client is None: self.hby.db.oobis.rem(keys=(url,)) print(f"error getting client for {url}, aborting OOBI") return self.clients[url] = client self.hby.db.oobis.rem(keys=(url,)) self.hby.db.coobi.pin(keys=(url,), val=obr) def processMultiOobiRpy(self, url, serder, mobr): data = serder.ked["a"] cid = data["aid"] if cid != mobr.cid: return Result.failed urls = data["urls"] mobr.urls = urls for murl in urls: obr = OobiRecord(date=nowIso8601()) obr.oobialias = mobr.oobialias obr.cid = mobr.cid self.hby.db.oobis.put(keys=(murl,), val=obr) self.hby.db.coobi.rem(keys=(url,)) self.hby.db.moobi.put(keys=(url,), val=mobr)
class Authenticator: def __init__(self, hby, clienter=None): """ Parameters: hby (Habery): Identifier database environment clienter (Clienter): DoDoer client provider responsible for managing HTTP client requests """ self.hby = hby self.clienter = clienter if clienter is not None else Clienter() self.clients = dict() self.doers = [self.clienter, doing.doify(self.authzDo)] def request(self, wurl, obr): client = self.clienter.request("GET", wurl) self.clients[wurl] = client self.hby.db.woobi.rem(keys=(wurl,)) self.hby.db.mfa.pin(keys=(wurl,), val=obr) def addAuthToAid(self, cid, url): now = nowIso8601() wkan = WellKnownAuthN(url=url, dt=now) self.hby.db.wkas.add(keys=(cid,), val=wkan) def authzDo(self, tymth=None, tock=0.0, **kwa): """ Returns doifiable Doist compatibile generator method (doer dog) to process .exc responses and pass them on to the HTTPRespondant Parameters: tymth (function): injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tock (float): injected initial tock value Usage: add result of doify on this method to doers list """ _ = (yield tock) while True: self.processFlows() yield tock def processFlows(self): """ Process well-known authentication URLs """ self.processWoobis() self.processMultiFactorAuth() def processWoobis(self): """ Process well-known OOBIs saved as multi-factor auth records Process wOOBI URLs by requesting from the endpoint and confirming the results """ for (wurl,), obr in self.hby.db.woobi.getTopItemIter(): # Find any woobis that match and can be used to perform MFA for this resolved AID purl = urlparse(wurl) if (match := WOOBI_RE.match(purl.path)) is not None: cid = match.group("cid") # print(cid, cid in self.hby.kevers) if cid in self.hby.kevers: obr.cid = match.group("cid") self.request(wurl, obr) else: logging.error(f"wurl {wurl} is not a valid well known OOBI for multi-factor auth") self.hby.db.woobi.rem(keys=(wurl,)) def processMultiFactorAuth(self): """ Process Client responses by parsing the messages and removing the client """ for (wurl,), obr in self.hby.db.mfa.getTopItemIter(): if wurl not in self.clients: self.request(wurl, obr) continue client = self.clients[wurl] if client.responses: response = client.responses.popleft() if 200 >= response["status"] <= 399: print(wurl, "succeeded") self.addAuthToAid(obr.cid, wurl) state = Result.resolved else: state = Result.failed obr.state = state self.clienter.remove(client) self.hby.db.mfa.rem(keys=(wurl,)) self.hby.db.rmfa.pin(keys=(wurl,), val=obr)