Source code for keri.db.escrowing

# -*- encoding: utf-8 -*-
"""
keri.core.escrowing module

"""
import datetime
import logging
from typing import Type

from hio.help import ogler


from ..help import helping
from ..kering import ValidationError

from .dbing import fetchTsgs
from .subing import CesrSuber, SerderSuber, CesrIoSetSuber, CatCesrIoSetSuber


logger = ogler.getLogger()


[docs] class Broker: """ Collection of databases for transaction state notices (TSNs) and handling TSN escrows. """
[docs] def __init__(self, db, subkey, timeout=3600): """ Initialize Broker with databases for transaction state notices and escrows. Parameters: db (Reger): TEL event database to make sub databases under subkey (str): parent LMDB subkey path to use for all sub databases timeout (int): timeout in seconds for escrows, default is 3600 seconds (1 hour) Attributes: db (Reger): TEL event database to make sub databases under timeout (int): timeout in seconds for escrows, default is 3600 seconds (1 hour) daterdb (CesrSuber): database for datetime stamps by ksn SAID serderdb (SerderSuber): database for reply messages by ksn SAID tigerdb (CesrIoSetSuber): database for indexed signatures by ksn quadruple Key schema: (said, pre, sn, dig) cigardb (CatCesrIoSetSuber): database for non-indexed signatures by ksn SAID escrowdb (CesrIoSetSuber): database for escrows by route by (typ, pre, aid) tuple saiderdb (CesrSuber): database for transaction state SAIDs by (pre, aid) tuple """ from ..core.coring import Cigar, Dater, Diger, Verfer from ..core.indexing import Siger self.db = db self.timeout = timeout # State support datetime stamps and signatures indexed and not-indexed # all ksn kdts (key state datetime serializations) maps said to date-time self.daterdb = CesrSuber(db=self.db, subkey=subkey + '-dts.', klas=Dater) # all reply messages that holdkey state messages. # Maps replay messages that hold key state said to serialization. ksns are # versioned sads ( with version string) so use Serder to deserialize and # use .kdts, .ksgs, and .kcgs for datetimes and signatures self.serderdb = SerderSuber(db=self.db, subkey=subkey + '-sns.') # RegStateRecords used as basis for registry state notices in replies #self.rsrdb = koming.Komer(db=self.db, #klas=RegStateRecord, #subkey=subkey + '-sns.') # all key state ksgs (ksn indexed signature serializations) maps ksn quadkeys # given by quadruple (saider.qb64, subkeyer.qb64, seqner.q64, diger.qb64) # of reply and trans signer's key state est evt to val Siger for each # signature. self.tigerdb = CesrIoSetSuber(db=self.db, subkey=subkey + '-sgs.', klas=Siger) # all key state kcgs (ksn non-indexed signature serializations) maps ksn SAID # to couple (Verfer, Cigar) of nontrans signer of signature in Cigar # nontrans qb64 of subkeyer is same as Verfer self.cigardb = CatCesrIoSetSuber(db=self.db, subkey=subkey + '-cgs.', klas=(Verfer, Cigar)) # all key state escrows indices of partially signed ksn messages. Maps # route in reply to single (Saider,) of escrowed ksn. # Routes such as /ksn/{aid} or /tsn/registry/{aid} self.escrowdb = CesrIoSetSuber(db=self.db, subkey=subkey + '-nes', klas=Diger) # transaction state SAID database for successfully saved transaction state notices # maps key=(prefix, aid) to val=said of transaction state self.saiderdb = CesrSuber(db=self.db, subkey=subkey + '-nas.', klas=Diger)
[docs] def current(self, keys): """ Get successfully saved TSNs by keys. Parameters: (str, str): keys tuple of (prefix, aid) where prefix is the registry identifier and pre is the issuer Returns: data (str | None): UTF-8 encoded string of the SAid of a TSN or None if not found. """ return self.saiderdb.get(keys=keys)
[docs] def processEscrowState(self, typ, processReply, extype: Type[Exception]): """ Process escrows for reply messages Process escrows for reply messages. Escrows are keyed by reply pre and val is reply said triple (prefixer, seqner, diger) quadruple (prefixer, seqner, diger, siger) Parameters: typ (str): escrow type processReply (func): function to call to process each message taken out of escrow extype (Type[Exception]): the expected exception type if the message should remain in escrow """ for (typ, pre, aid), diger in self.escrowdb.getTopItemIter(keys=(typ, '')): try: tsgs = fetchTsgs(db=self.tigerdb, diger=diger) keys = (diger.qb64,) dater = self.daterdb.get(keys=keys) serder = self.serderdb.get(keys=keys) vcigars = self.cigardb.get(keys=keys) try: if not (dater and serder and (tsgs or vcigars)): msg = f"Missing escrow artifacts at said={diger.qb64} for pre={pre}." logger.info("Broker %s: unescrow error: %s", typ, msg) raise ValueError(msg) cigars = [] if vcigars: for (verfer, cigar) in vcigars: cigar.verfer = verfer cigars.append(cigar) # do date math for stale escrow if ((helping.nowUTC() - dater.datetime) > datetime.timedelta(seconds=self.timeout)): # escrow stale so raise ValidationError which unescrows below msg = f"Escrow unescrow error: Stale txn state escrow at pre = {pre}" logger.trace("Broker %s: %s", typ, msg) raise ValidationError(msg) processReply(serder=serder, diger=diger, route=serder.ked["r"], cigars=cigars, tsgs=tsgs, aid=aid) except extype as ex: # still waiting on missing prior event to validate if logger.isEnabledFor(logging.TRACE): logger.trace("Broker %s: unescrow attempt failed: %s\n", typ, ex.args[0]) logger.exception("Broker %s: unescrow attempt failed: %s", typ, ex.args[0]) except Exception as ex: # other error so remove from reply escrow self.escrowdb.rem(keys=(typ, pre, aid), val=diger) # remove escrow if logger.isEnabledFor(logging.DEBUG): logger.exception("Broker %s: unescrowed due to error: %s", typ, ex.args[0]) else: logger.error("Broker %s: unescrowed due to error: %s", typ, ex.args[0]) else: # unescrow succeded self.escrowdb.rem(keys=(typ, pre, aid), val=diger) # remove escrow logger.info("Broker %s: unescrow succeeded for txn state=%s", typ, serder.said) logger.debug("TXN State Body=\n%s\n", serder.pretty()) except Exception as ex: # log diagnostics errors etc self.escrowdb.rem(keys=(typ, pre, aid), val=diger) # remove escrow self.removeState(diger) if logger.isEnabledFor(logging.DEBUG): logger.exception("Broker %s: unescrowed due to error: %s", typ, ex.args[0]) else: logger.error("Broker %s: unescrowed due to error: %s", typ, ex.args[0])
[docs] def escrowStateNotice(self, *, typ, pre, aid, serder, diger, dater, cigars=None, tsgs=None): """ Escrow reply by route Parameters: typ (str): escrow type pre (str): identifier of key state aid (str): identifier of authorizer of key state serder (Serder): instance of reply msg (SAD) diger (Diger): instance from said in serder (SAD) dater (Dater): instance from date-time in serder (SAD) cigars (list): of Cigar instances that contain nontrans signing couple signature in .raw and public key in .verfer tsgs (Iterable): of quadruples of form (prefixer, seqner, diger, siger) where: prefixer is pre of trans endorser seqner is sequence number of trans endorser's est evt for keys for sigs diger is digest of trans endorser's est evt for keys for sigs siger is indexed sig from trans endorser's key from est evt """ cigars = cigars if cigars is not None else [] tsgs = tsgs if tsgs is not None else [] keys = (diger.qb64,) self.daterdb.put(keys=keys, val=dater) # first one idempotent self.serderdb.put(keys=keys, val=serder) # first one idempotent for prefixer, seqner, tsgdiger, sigers in tsgs: # iterate over each tsg quadkeys = (diger.qb64, prefixer.qb64, f"{seqner.sn:032x}", tsgdiger.qb64) self.tigerdb.put(keys=quadkeys, vals=sigers) for cigar in cigars: # process each couple to verify sig and write to db self.cigardb.put(keys=keys, vals=[(cigar.verfer, cigar)]) return self.escrowdb.put(keys=(typ, pre, aid), vals=[diger]) # does not overwrite
[docs] def updateReply(self, aid, serder, diger, dater): """ Update Reply SAD in database given by by serder and associated databases for attached cig couple or sig quadruple. Overwrites val at key if already exists. Parameters: aid (str): identifier of key state serder (Serder): instance of reply msg (SAD) diger (Diger): instance from said in serder (SAD) dater (Dater): instance from date-time in serder (SAD) """ keys = (diger.qb64,) # Add source of ksn to the key for DATEs too... (source AID, ksn AID) self.daterdb.put(keys=keys, val=dater) # first one idempotent self.serderdb.pin(keys=keys, val=serder) # first one idempotent # Add source of ksn to the key... (source AID, ksn AID) self.saiderdb.pin(keys=(serder.sad["a"]["i"], aid), val=diger) # overwrite
[docs] def removeState(self, diger): """Remove all state associated with the given event TSN identified by SAID.""" if diger: keys = (diger.qb64,) self.tigerdb.trim(keys=(diger.qb64, "")) # remove whole branch self.cigardb.rem(keys=keys) self.serderdb.rem(keys=keys) self.daterdb.rem(keys=keys)