Source code for keri.vc.protocoling

# -*- encoding: utf-8 -*-
"""
keri.vc.handling module

"""
import os
from collections import namedtuple

from .. import help
from ..peer import exchanging

logger = help.ogler.getLogger()

Ipexage = namedtuple("Ipexage", 'apply offer agree grant admit spurn')
Ipex = Ipexage(apply="apply", offer="offer", agree="agree", grant="grant", admit="admit", spurn="spurn")
PreviousRoutes = {
    Ipex.offer: (Ipex.apply,),
    Ipex.agree: (Ipex.offer,),
    Ipex.grant: (Ipex.agree,),
    Ipex.admit: (Ipex.grant,),
    Ipex.spurn: (Ipex.apply, Ipex.offer, Ipex.agree, Ipex.grant),
}


[docs] class IpexHandler: """ Processor of `exn` IPEX messages. """
[docs] def __init__(self, resource, hby, notifier): """ Initialize instance Parameters: resource (str): route of messages for this handler hby (Habery): local identifier environment notifier (Notifier): outbound notifications """ self.resource = resource self.hby = hby self.notifier = notifier
[docs] def verify(self, serder, attachments=None): """ Do route specific processsing of IPEX protocol exn messages Parameters: serder (Serder): Serder of the IPEX protocol exn message attachments (list): list of tuples of pather, CESR SAD path attachments to the exn event Returns: bool: True means the exn passed behaviour specific verification for IPEX protocol messages """ route = serder.ked['r'] dig = serder.ked['p'] match route.split("/"): case["", "ipex", Ipex.apply]: if not dig: # Apply messages can only start an IPEX exchange return True case["", "ipex", verb] if verb in (Ipex.offer, Ipex.grant): if not dig: # This is an offer, agree or grant opening an IPEX exchange, no prior return True pserder, _ = exchanging.cloneMessage(self.hby, said=dig) if pserder is None: # previous reference message does not exist return False proute = pserder.ked['r'] pverb = os.path.basename(os.path.normpath(proute)) # Use established PreviousRoutes to determine if this response is valid if pverb not in PreviousRoutes[verb]: return False return self.response(pserder) is None # Make sure we don't have a response already case["", "ipex", verb] if verb in (Ipex.admit, Ipex.agree, Ipex.spurn): if not dig: # Admit and Spurn messages can NOT start an IPEX exchange return False pserder, _ = exchanging.cloneMessage(self.hby, said=dig) if pserder is None: # previous reference message does not exist return False proute = pserder.ked['r'] pverb = os.path.basename(os.path.normpath(proute)) # Use established PreviousRoutes to determine if this response is valid if pverb not in PreviousRoutes[verb]: return False return self.response(pserder) is None # Make sure we don't have a response already return False
[docs] def response(self, serder): """ Return the IPEX exn message sent as a response to the provided serder, if any Parameters: serder (Serder): IPEX exn message to check for a response Returns: """ saider = self.hby.db.erpy.get(keys=(serder.said,)) if saider: rserder, _ = exchanging.cloneMessage(self.hby, saider.qb64) # Clone previous so we reverify the sigs return rserder return None
[docs] def handle(self, serder, attachments=None): """ Do route specific processsing of IPEX protocol exn messages Parameters: serder (Serder): Serder of the IPEX protocol exn message attachments (list): list of tuples of pather, CESR SAD path attachments to the exn event """ attrs = serder.ked["a"] data = dict( r=f"/exn{serder.ked['r']}", d=serder.said, m=attrs["m"] ) self.notifier.add(attrs=data)
[docs] def ipexApplyExn(hab, recp, message, schema, attrs): """ Apply for an ACDC Parameters: hab(Hab): identifier environment for issuer of credential recp (str): qb64 AID of recipient message(str): Human readable message regarding the credential application schema (any): schema or its SAID attrs (any): attribute field label list Returns: Serder: credential issuance exn peer to peer message bytes: attachments for exn message """ data = dict( m=message, s=schema, a=attrs, i=recp ) exn, end = exchanging.exchange(route="/ipex/apply", payload=data, sender=hab.pre) ims = hab.endorse(serder=exn, last=False, pipelined=False) del ims[:exn.size] ims.extend(end) return exn, ims
[docs] def ipexOfferExn(hab, message, acdc, apply=None): """ Offer a metadata ACDC Parameters: hab(Hab): identifier environment for issuer of credential message(str): Human readable message regarding the credential offer acdc (any): metadata ACDC or its SAID apply (Serder): optional IPEX exn apply message that this offer is response to. Returns: Serder: credential issuance exn peer to peer message bytes: attachments for exn message """ data = dict( m=message ) embeds = dict( acdc=acdc ) kwa = dict() if apply is not None: kwa["dig"] = apply.said exn, end = exchanging.exchange(route="/ipex/offer", payload=data, sender=hab.pre, embeds=embeds, **kwa) ims = hab.endorse(serder=exn, last=False, pipelined=False) del ims[:exn.size] ims.extend(end) return exn, ims
[docs] def ipexAgreeExn(hab, message, offer): """ Agree an offer Parameters: hab(Hab): identifier environment for issuer of credential message(str): Human readable message regarding the credential agreement offer (Serder): IPEX exn offer message that this offer is response to. Returns: Serder: credential issuance exn peer to peer message bytes: attachments for exn message """ data = dict( m=message ) exn, end = exchanging.exchange(route="/ipex/agree", payload=data, sender=hab.pre, dig=offer.said) ims = hab.endorse(serder=exn, last=False, pipelined=False) del ims[:exn.size] ims.extend(end) return exn, ims
[docs] def ipexGrantExn(hab, recp, message, acdc, iss=None, anc=None, agree=None, dt=None): """ Disclose an ACDC Parameters: hab(Hab): identifier environment for issuer of credential recp (str) qb64 AID of recipient of GRANT message message(str): Human readable message regarding the credential disclosure acdc (bytes): CESR stream of serialized ACDC with attachments iss (bytes): serialized TEL issuance event anc (bytes): serialized anchoring event in the KEL, either ixn or rot agree (Serder): optional IPEX exn agree message that this grant is response to. dt (str): Iso8601 formatted date string to use for this request Returns: Serder: credential issuance exn peer to peer message bytes: attachments for exn message """ data = dict( m=message, i=recp, ) embeds = dict( acdc=acdc, ) if iss is not None: embeds['iss'] = iss if anc is not None: embeds['anc'] = anc kwa = dict() if agree is not None: kwa['dig'] = agree.said exn, end = exchanging.exchange(route="/ipex/grant", payload=data, sender=hab.pre, embeds=embeds, date=dt, **kwa) ims = hab.endorse(serder=exn, last=False, pipelined=False) del ims[:exn.size] ims.extend(end) return exn, ims
[docs] def ipexAdmitExn(hab, message, grant, dt=None): """ Admit a disclosure Parameters: hab(Hab): identifier environment for issuer of credential message(str): Human readable message regarding the admission grant (Serder): IPEX grant exn message serder dt (str): timestamp Returns: Serder: credential issuance exn peer to peer message bytes: attachments for exn message """ data = dict( m=message, ) exn, end = exchanging.exchange(route="/ipex/admit", payload=data, sender=hab.pre, dig=grant.said, date=dt) ims = hab.endorse(serder=exn, last=False, pipelined=False) del ims[:exn.size] ims.extend(end) return exn, ims
[docs] def ipexSpurnExn(hab, message, spurned): """ Reject an application, offer or agreement Parameters: hab(Hab): identifier environment for issuer of credential message(str): Human readable message regarding the admission spurned (Serder): apply, offer, agree or grant received Returns: Serder: credential issuance exn peer to peer message bytes: attachments for exn message """ data = dict( m=message ) exn, end = exchanging.exchange(route="/ipex/spurn", payload=data, sender=hab.pre, dig=spurned.said) ims = hab.endorse(serder=exn, last=False, pipelined=False) del ims[:exn.size] ims.extend(end) return exn, ims
[docs] def loadHandlers(hby, exc, notifier): """ Load handlers for the IPEX protocol Parameters: hby (Habery): Database and keystore for environment exc (Exchanger): Peer-to-peer message router notifier (Notifier): outbound notifications """ exc.addHandler(IpexHandler(resource="/ipex/apply", hby=hby, notifier=notifier)) exc.addHandler(IpexHandler(resource="/ipex/offer", hby=hby, notifier=notifier)) exc.addHandler(IpexHandler(resource="/ipex/agree", hby=hby, notifier=notifier)) exc.addHandler(IpexHandler(resource="/ipex/grant", hby=hby, notifier=notifier)) exc.addHandler(IpexHandler(resource="/ipex/admit", hby=hby, notifier=notifier)) exc.addHandler(IpexHandler(resource="/ipex/spurn", hby=hby, notifier=notifier))