Source code for keri.vdr.viring

# -*- encoding: utf-8 -*-
"""
keri.db.viring module

VIR  Verifiable Issuance(Revocation) Registry

Provides public simple Verifiable Credential Issuance/Revocation Registry
A special purpose Verifiable Data Registry (VDR)
"""

from dataclasses import dataclass, field, asdict
from  ordered_set import OrderedSet as oset

from ..db import koming, subing, escrowing

from .. import kering
from ..app import signing
from ..core import coring, serdering
from ..db import dbing, basing
from ..db.dbing import snKey
from ..help import helping
from ..vc import proving
from ..vdr import eventing


[docs] class rbdict(dict): """ Reger backed read through cache for registry state Subclass of dict that has db and reger as attributes and employs read through cache from db Reger.stts of registry states to reload tever from state in database when not found in memory as dict item. """ __slots__ = ('db', 'reger') # no .__dict__ just for db reference def __init__(self, *pa, **kwa): super(rbdict, self).__init__(*pa, **kwa) self.db = None self.reger = None def __getitem__(self, k): try: return super(rbdict, self).__getitem__(k) except KeyError as ex: if not self.db or not self.reger: raise ex # reraise KeyError if (rsr := self.reger.states.get(keys=k)) is None: raise ex # reraise KeyError try: tever = eventing.Tever(rsr=rsr, db=self.db, reger=self.reger) except kering.MissingEntryError: # no kel event for keystate raise ex # reraise KeyError super(rbdict, self).__setitem__(k, tever) return tever def __setitem__(self, key, item): super(rbdict, self).__setitem__(key, item) self.reger.states.pin(keys=key, val=item.state()) def __delitem__(self, key): super(rbdict, self).__delitem__(key) self.reger.states.rem(keys=key) def __contains__(self, k): if not super(rbdict, self).__contains__(k): try: self.__getitem__(k) return True except KeyError: return False else: return True
[docs] def get(self, k, default=None): """Override of dict get method Parameters: k (str): key for dict default: default value to return if not found Returns: tever: converted from underlying dict or database """ if not super(rbdict, self).__contains__(k): return default else: return self.__getitem__(k)
[docs] @dataclass class RegistryRecord: """ Registry Key keyed by Registry name """ registryKey: str prefix: str
[docs] @dataclass class RegStateRecord(basing.RawRecord): # reger.state """ Registry Event Log (REL) State information (see reger.state at 'stts' for database that holds these records keyed by Registry SAID, i field) Attributes: vn (list[int]): version number [major, minor] i (str): registry SAID qb64 (registry inception event SAID) s (str): sequence number of latest event in KEL as hex str d (str): latest registry event digest qb64 ii (str): registry issuer identifier aid qb64 dt (str): datetime iso-8601 of registry state record update, usually now et (str): event packet type (ilk) bt (str): backer threshold hex num b (list[str]): backer aids qb64 c (list[str]): config traits Note: the seal anchor dict 'a' field is not included in the state notice because it may be verbose and would impede the main purpose of a notice which is to trigger the download of the latest events, which would include the anchored seals. rsr = viring.RegStateRecord( vn=list(version), # version number as list [major, minor] i=ri, # qb64 registry SAID s="{:x}".format(sn), # lowercase hex string no leading zeros d=said, ii=pre, dt=dts, et=eilk, bt="{:x}".format(toad), # hex string no leading zeros lowercase b=wits, # list of qb64 may be empty c=cnfg if cnfg is not None else [], ) """ vn: list[int] = field(default_factory=list) # version number [major, minor] round trip serializable i: str = '' # identifier prefix qb64 s: str = '0' # sequence number of latest event in KEL as hex str d: str = '' # latest event digest qb64 ii: str = '' # issuer identifier of registry aid qb64 dt: str = '' # datetime of update of state record et: str = '' # TEL evt packet type (ilk) bt: str = '0' # backer threshold hex num str b: list = field(default_factory=list) # backer AID list qb64 c: list[str] = field(default_factory=list) # config trait list
[docs] @dataclass class VcStateRecord(basing.RawRecord): vn: list[str] = field(default_factory=list) # version number [major, minor] round trip serializable i: str = '' # identifier prefix qb64 s: str = '0' # sequence number of latest event in KEL as hex str d: str = '' # latest event digest qb64 ri: str = '' # registry identifier of registry aid qb64 ra: dict = field(default_factory=dict) # registry anchor for registry with backers a: dict = field(default_factory=dict) # seal for anchor in KEL dt: str = '' # datetime of update of state record et: str = '' # TEL evt packet type (ilk)
[docs] def openReger(name="test", **kwa): """ Returns contextmanager generated by openLMDB but with Baser instance Parameters: name (str): registry database name **kwa (dict) keyword arguments to pass to LMDB """ return dbing.openLMDB(cls=Reger, name=name, **kwa)
[docs] class Reger(dbing.LMDBer): """ Reger sets up named sub databases for TEL registry Attributes: see superclass LMDBer for inherited attributes .tvts is named sub DB whose values are serialized TEL events dgKey DB is keyed by identifier prefix plus digest of serialized event Only one value per DB key is allowed .tels is named sub DB of transaction event log tables that map sequence numbers to serialized event digests. snKey Values are digests used to lookup event in .tvts sub DB DB is keyed by identifier prefix plus sequence number of tel event Only one value per DB key is allowed .tibs is named sub DB of indexed backer signatures of event Backers always have nontransferable indetifier prefixes. The index is the offset of the backer into the backer list of the anchored management event wrt the receipted event. dgKey DB is keyed by identifier prefix plus digest of serialized event More than one value per DB key is allowed .oots is named sub DB of out of order escrowed event tables that map sequence numbers to serialized event digests. snKey Values are digests used to lookup event in .tvts sub DB DB is keyed by identifier prefix plus sequence number of key event Only one value per DB key is allowed .baks is named sub DB of ordered list of backers at given point in management TEL. dgKey DB is keyed by identifier prefix plus digest of serialized event More than one value per DB key is allowed .twes is named sub DB of partially witnessed escrowed event tables that map sequence numbers to serialized event digests. snKey Values are digests used to lookup event in .tvts sub DB DB is keyed by identifier prefix plus sequence number of tel event Only one value per DB key is allowed .taes is named sub DB of anchorless escrowed event tables that map sequence numbers to serialized event digests. snKey Values are digests used to lookup event in .tvts sub DB DB is keyed by identifier prefix plus sequence number of tel event Only one value per DB key is allowed .ancs is a named sub DB of anchors to KEL events. Quadlet Each quadruple is concatenation of four fully qualified items of validator. These are: transferable prefix, plus latest establishment event sequence number plus latest establishment event digest, plus indexed event signature. When latest establishment event is multisig then there will be multiple quadruples one per signing key, each a dup at same db key. dgKey DB is keyed by identifier prefix plus digest of serialized event Only one value per DB key is allowed .regs is named subDB instance of Komer that maps registry names to registry keys key is habitat name str value is serialized RegistryRecord dataclass """ TailDirPath = "keri/reg" AltTailDirPath = ".keri/reg" TempPrefix = "keri_reg_"
[docs] def __init__(self, headDirPath=None, reopen=True, **kwa): """ Setup named sub databases. Inherited Parameters: name (str): directory path name differentiator for main database When system employs more than one keri database, name allows differentiating each instance by name temp (boolean,): assign to .temp True then open in temporary directory, clear on close Othewise then open persistent directory, do not clear on close headDirPath (Optional(str)): head directory pathname for main database If not provided use default .HeadDirpath mode (int): numeric os dir permissions for database directory reopen (boolean,): IF True then database will be reopened by this init Notes: dupsort=True for sub DB means allow unique (key,pair) duplicates at a key. Duplicate means that is more than one value at a key but not a redundant copies a (key,value) pair per key. In other words the pair (key,value) must be unique both key and value in combination. Attempting to put the same (key,value) pair a second time does not add another copy. Duplicates are inserted in lexocographic order by value, insertion order. """ self.registries = oset() if "db" in kwa: self._tevers = rbdict() self._tevers.reger = self # assign db for read thorugh cache of kevers self._tevers.db = kwa["db"] else: self._tevers = dict() super(Reger, self).__init__(headDirPath=headDirPath, reopen=reopen, **kwa)
@property def tevers(self): """ Returns ._tevers tevers getter """ return self._tevers
[docs] def reopen(self, **kwa): """ Open sub databases Parameters: **kwa (dict): keyword arguments passed to super.reopen """ super(Reger, self).reopen(**kwa) # Create by opening first time named sub DBs within main DB instance # Names end with "." as sub DB name must include a non Base64 character # to avoid namespace collisions with Base64 identifier prefixes. self.tvts = self.env.open_db(key=b'tvts.') self.tels = self.env.open_db(key=b'tels.') self.ancs = self.env.open_db(key=b'ancs.') self.tibs = self.env.open_db(key=b'tibs.', dupsort=True) self.baks = self.env.open_db(key=b'baks.', dupsort=True) self.oots = self.env.open_db(key=b'oots.') self.twes = self.env.open_db(key=b'twes.') self.taes = self.env.open_db(key=b'taes.') self.tets = subing.CesrSuber(db=self, subkey='tets.', klas=coring.Dater) # Registry state made of RegStateRecord. # Each registry has registry event log keyed by registry identifier self.states = koming.Komer(db=self, schema=RegStateRecord, subkey='stts.') #self.states = subing.SerderSuber(db=self, subkey='stts.') # registry event state # Holds the credential self.creds = subing.SerderSuber(db=self, subkey="creds.", klas=serdering.SerderACDC) # database of anchors to credentials. prefix is either AID with direct credential # anchor or TEL event AID (same as credential SAID) when credential uses revocation registry self.cancs = subing.CatCesrSuber(db=self, subkey='cancs.', klas=(coring.Prefixer, coring.Seqner, coring.Saider)) # all sad path ssgs (sad pathed indexed signature serializations) maps SAD quinkeys # given by quintuple (saider.qb64, path, prefixer.qb64, seqner.q64, diger.qb64) # of credential and trans signer's key state est evt to val Siger for each # signature. self.spsgs = subing.CesrIoSetSuber(db=self, subkey='ssgs.', klas=coring.Siger) # all sad path scgs (sad pathed non-indexed signature serializations) maps # couple (SAD SAID, path) to couple (Verfer, Cigar) of nontrans signer of signature in Cigar # nontrans qb64 of Prefixer is same as Verfer self.spcgs = subing.CatCesrIoSetSuber(db=self, subkey='scgs.', klas=(coring.Verfer, coring.Cigar)) # Index of credentials processed and saved. Indicates fully verified (even if revoked) self.saved = subing.CesrSuber(db=self, subkey='saved.', klas=coring.Saider) # Index of credentials by issuer. My credentials issued, key == hab.pre self.issus = subing.CesrDupSuber(db=self, subkey='issus.', klas=coring.Saider) # Index of credentials by subject. My credentials received, key == hab.pre self.subjs = subing.CesrDupSuber(db=self, subkey='subjs.', klas=coring.Saider) # Index of credentials by schema self.schms = subing.CesrDupSuber(db=self, subkey='schms.', klas=coring.Saider) # Missing reegistry escrow self.mre = subing.CesrSuber(db=self, subkey='mre.', klas=coring.Dater) # Broken chain escrow self.mce = subing.CesrSuber(db=self, subkey='mce.', klas=coring.Dater) # Missing schema escrow self.mse = subing.CesrSuber(db=self, subkey='mse.', klas=coring.Dater) # Collection of sub-dbs for persisting Registry Txn State Notices self.txnsb = escrowing.Broker(db=self, subkey="txn.") # registry keys keyed by Registry name self.regs = koming.Komer(db=self, subkey='regs.', schema=RegistryRecord, ) # TEL partial witness escrow self.tpwe = subing.CatCesrIoSetSuber(db=self, subkey='tpwe.', klas=(coring.Prefixer, coring.Seqner, coring.Saider)) # TEL multisig anchor escrow self.tmse = subing.CatCesrIoSetSuber(db=self, subkey='tmse.', klas=(coring.Prefixer, coring.Seqner, coring.Saider)) # TEL event disemination escrow self.tede = subing.CatCesrIoSetSuber(db=self, subkey='tede.', klas=(coring.Prefixer, coring.Seqner, coring.Saider)) # Completed TEL event self.ctel = subing.CesrSuber(db=self, subkey='ctel.', klas=coring.Saider) # Credential Missing Signature Escrow self.cmse = subing.SerderSuber(db=self, subkey="cmse.", klas=serdering.SerderACDC) # Completed Credentials self.ccrd = subing.SerderSuber(db=self, subkey="ccrd.", klas=serdering.SerderACDC) return self.env
[docs] def cloneCreds(self, saids, db): """ Returns fully expanded credential with chained credentials attached. Parameters: saids (list): of Saider objects: db (Baser): baser object to load schema Returns: list: fully hydrated credentials with full chains provided """ creds = [] for saider in saids: key = saider.qb64 creder, prefixer, seqner, asaider = self.cloneCred(said=key) atc = bytearray(signing.serialize(creder, prefixer, seqner, saider)) del atc[0:creder.size] iss = bytearray(self.cloneTvtAt(creder.said)) iserder = serdering.SerderKERI(raw=iss) issatc = bytes(iss[iserder.size:]) del iss[0:iserder.size] chainSaids = [] for k, p in (creder.edge.items() if creder.edge is not None else {}): if k == "d": continue if not isinstance(p, dict): continue chainSaids.append(coring.Saider(qb64=p["n"])) chains = self.cloneCreds(chainSaids, db) regk = creder.regi status = self.tevers[regk].vcState(saider.qb64) schemer = db.schema.get(creder.schema) cred = dict( sad=creder.sad, atc=atc.decode("utf-8"), iss=iserder.sad, issatc=issatc.decode("utf-8"), pre=creder.issuer, schema=schemer.sed, chains=chains, status=asdict(status), anchor=dict( pre=prefixer.qb64, sn=seqner.sn, d=asaider.qb64 ) ) ctr = coring.Counter(qb64b=iss, strip=True) if ctr.code == coring.CtrDex.AttachedMaterialQuadlets: ctr = coring.Counter(qb64b=iss, strip=True) if ctr.code == coring.CtrDex.SealSourceCouples: coring.Seqner(qb64b=iss, strip=True) saider = coring.Saider(qb64b=iss) anc = db.cloneEvtMsg(pre=creder.issuer, fn=0, dig=saider.qb64b) aserder = serdering.SerderKERI(raw=anc) ancatc = bytes(anc[aserder.size:]) cred['anc'] = aserder.sad cred['ancatc'] = ancatc.decode("utf-8"), creds.append(cred) return creds
[docs] def logCred(self, creder, prefixer, seqner, saider): """ Save the base credential and seals (est evt+sigs quad) with no indices. Parameters: creder (Creder): that contains the credential to process prefixer (Prefixer): prefix (AID or TEL) of event anchoring credential seqner (Seqner): sequence number of event anchoring credential saider (Diger) digest of anchoring event for credential """ key = creder.said self.cancs.pin(keys=key, val=[prefixer, seqner, saider]) self.creds.put(keys=key, val=creder)
[docs] def cloneCred(self, said): """ Load base credential and CESR proof signatures from database. Base credential and all signatures are returned from the credential data store. If root is specified, all signatures are transposed to have that path as the root. This is used to embed the credential in another SAD at the location of the specified root. Parameters: said(str or bytes): qb64 SAID of credential """ creder = self.creds.get(keys=(said,)) prefixer, seqner, saider = self.cancs.get(keys=(said,)) return creder, prefixer, seqner, saider
[docs] def clonePreIter(self, pre, fn=0): """ Iterator of first seen event messages Returns iterator of first seen event messages with attachments for the TEL prefix pre starting at fir`st seen order number, fn. Essentially a replay in first seen order with attachments Parameters: pre (bytes): qb64 identifier prefix of registry state TEL fn (int): first seen ordinal Returns: iterator: bytearray per serializeed event msg """ if hasattr(pre, 'encode'): pre = pre.encode("utf-8") for fn, dig in self.getTelItemPreIter(pre, fn=fn): msg = self.cloneTvt(pre, dig) yield msg
def cloneTvtAt(self, pre, sn=0): snkey = dbing.snKey(pre, sn) dig = self.getTel(key=snkey) return self.cloneTvt(pre, dig) def cloneTvt(self, pre, dig): msg = bytearray() # message atc = bytearray() # attachments dgkey = dbing.dgKey(pre, dig) # get message if not (raw := self.getTvt(key=dgkey)): raise kering.MissingEntryError("Missing event for dig={}.".format(dig)) msg.extend(raw) # add indexed backer signatures to attachments if tibs := self.getTibs(key=dgkey): atc.extend(coring.Counter(code=coring.CtrDex.WitnessIdxSigs, count=len(tibs)).qb64b) for tib in tibs: atc.extend(tib) # add authorizer (delegator/issure) source seal event couple to attachments couple = self.getAnc(dgkey) if couple is not None: atc.extend(coring.Counter(code=coring.CtrDex.SealSourceCouples, count=1).qb64b) atc.extend(couple) # prepend pipelining counter to attachments if len(atc) % 4: raise ValueError("Invalid attachments size={}, nonintegral" " quadlets.".format(len(atc))) pcnt = coring.Counter(code=coring.CtrDex.AttachedMaterialQuadlets, count=(len(atc) // 4)).qb64b msg.extend(pcnt) msg.extend(atc) return msg
[docs] def sources(self, db, creder): """ Returns raw bytes of any source ('e') credential that is in our database Parameters: db (LMDBer): table to search creder (Creder): root credential Returns: list: credential sources as resolved from `e` in creder.crd """ chains = creder.edge if creder.edge is not None else {} saids = [] for key, source in chains.items(): if key == 'd': continue if not isinstance(source, dict): continue saids.append(source['n']) sources = [] for said in saids: screder, prefixer, seqner, saider = self.cloneCred(said=said) atc = bytearray(coring.Counter(coring.CtrDex.SealSourceTriples, count=1).qb64b) atc.extend(prefixer.qb64b) atc.extend(seqner.qb64b) atc.extend(saider.qb64b) sources.append((screder, atc)) sources.extend(self.sources(db, screder)) return sources
[docs] def putTvt(self, key, val): """ Use dgKey() Write serialized VC bytes val to key Does not overwrite existing val if any Returns True If val successfully written Else False Return False if key already exists """ return self.putVal(self.tvts, key, val)
[docs] def setTvt(self, key, val): """ Use dgKey() Write serialized VC bytes val to key Overwrites existing val if any Returns True If val successfully written Else False """ return self.setVal(self.tvts, key, val)
[docs] def getTvt(self, key): """ Use dgKey() Return event at key Returns None if no entry at key """ return self.getVal(self.tvts, key)
[docs] def delTvt(self, key): """ Use dgKey() Deletes value at key. Returns True If key exists in database Else False """ return self.delVal(self.tvts, key)
[docs] def putTel(self, key, val): """ Use snKey() Write serialized VC bytes val to key Does not overwrite existing val if any Returns True If val successfully written Else False Return False if key already exists """ return self.putVal(self.tels, key, val)
[docs] def setTel(self, key, val): """ Use snKey() Write serialized VC bytes val to key Overwrites existing val if any Returns True If val successfully written Else False """ return self.setVal(self.tels, key, val)
[docs] def getTel(self, key): """ Use snKey() Return event at key Returns None if no entry at key """ return self.getVal(self.tels, key)
[docs] def delTel(self, key): """ Use snKey() Deletes value at key. Returns True If key exists in database Else False """ return self.delVal(self.tels, key)
[docs] def getTelItemPreIter(self, pre, fn=0): """ Returns iterator of all (fn, dig) duples in first seen order for all events with same prefix, pre, in database. Items are sorted by fnKey(pre, fn) where fn is first seen order number int. Returns a First Seen Event Log TEL. Returned items are duples of (fn, dig): Where fn is first seen order number int and dig is event digest for lookup in .evts sub db. Raises StopIteration Error when empty. Parameters: pre is bytes of itdentifier prefix fn is int fn to resume replay. Earliset is fn=0 """ return self.getAllOrdItemPreIter(db=self.tels, pre=pre, on=fn)
[docs] def cntTels(self, pre, fn=0): """ Returns count of all (fn, dig) for all events with same prefix, pre, in database. Parameters: pre is bytes of itdentifier prefix fn is int fn to resume replay. Earliset is fn=0 """ if hasattr(pre, "encode"): pre = pre.encode("utf-8") # convert str to bytes return self.cntValsAllPre(db=self.tels, pre=pre, on=fn)
[docs] def getTibs(self, key): """ Use dgKey() Return list of indexed witness signatures at key Returns empty list if no entry at key Duplicates are retrieved in lexocographic order not insertion order. """ return self.getVals(self.tibs, key)
[docs] def getTibsIter(self, key): """ Use dgKey() Return iterator of indexed witness signatures at key Raises StopIteration Error when empty Duplicates are retrieved in lexocographic order not insertion order. """ return self.getValsIter(self.tibs, key)
[docs] def putTibs(self, key, vals): """ Use dgKey() Write each entry from list of bytes indexed witness signatures vals to key Adds to existing signatures at key if any Returns True If no error Apparently always returns True (is this how .put works with dupsort=True) Duplicates are inserted in lexocographic order not insertion order. """ return self.putVals(self.tibs, key, vals)
[docs] def addTib(self, key, val): """ Use dgKey() Add indexed witness signature val bytes as dup to key in db Adds to existing values at key if any Returns True if written else False if dup val already exists Duplicates are inserted in lexocographic order not insertion order. """ return self.addVal(self.tibs, key, val)
[docs] def cntTibs(self, key): """ Use dgKey() Return count of indexed witness signatures at key Returns zero if no entry at key """ return self.cntVals(self.tibs, key)
[docs] def delTibs(self, key, val=b''): """ Use dgKey() Deletes all values at key if val = b'' else deletes dup val = val. Returns True If key exists in database (or key, val if val not b'') Else False """ return self.delVals(self.tibs, key, val)
[docs] def putTwe(self, key, val): """ Use snKey() Write serialized VC bytes val to key Does not overwrite existing val if any Returns True If val successfully written Else False Return False if key already exists """ return self.putVal(self.twes, key, val)
[docs] def setTwe(self, key, val): """ Use snKey() Write serialized VC bytes val to key Overwrites existing val if any Returns True If val successfully written Else False """ return self.setVal(self.twes, key, val)
[docs] def getTwe(self, key): """ Use snKey() Return event at key Returns None if no entry at key """ return self.getVal(self.twes, key)
[docs] def delTwe(self, key): """ Use snKey() Deletes value at key. Returns True If key exists in database Else False """ return self.delVal(self.twes, key)
[docs] def putTae(self, key, val): """ Use snKey() Write serialized VC bytes val to key Does not overwrite existing val if any Returns True If val successfully written Else False Return False if key already exists """ return self.putVal(self.taes, key, val)
[docs] def setTae(self, key, val): """ Use snKey() Write serialized VC bytes val to key Overwrites existing val if any Returns True If val successfully written Else False """ return self.setVal(self.taes, key, val)
[docs] def getTae(self, key): """ Use snKey() Return event at key Returns None if no entry at key """ return self.getVal(self.taes, key)
[docs] def getTaeItemIter(self): """ Return iterator of all items in .taes """ return self.getAllItemIter(self.taes, split=True)
[docs] def delTae(self, key): """ Use snKey() Deletes value at key. Returns True If key exists in database Else False """ return self.delVal(self.taes, key)
[docs] def putOot(self, key, val): """ Use snKey() Write serialized VC bytes val to key Does not overwrite existing val if any Returns True If val successfully written Else False Return False if key already exists """ return self.putVal(self.oots, key, val)
[docs] def setOot(self, key, val): """ Use snKey() Write serialized VC bytes val to key Overwrites existing val if any Returns True If val successfully written Else False """ return self.setVal(self.oots, key, val)
[docs] def getOot(self, key): """ Use snKey() Return event at key Returns None if no entry at key """ return self.getVal(self.oots, key)
[docs] def getOotItemIter(self): """ Return iterator of all items in .taes """ return self.getAllItemIter(self.oots, split=True)
[docs] def delOot(self, key): """ Use snKey() Deletes value at key. Returns True If key exists in database Else False """ return self.delVal(self.oots, key)
[docs] def putAnc(self, key, val): """ Use dgKey() Write serialized VC bytes val to key Does not overwrite existing val if any Returns True If val successfully written Else False Return False if key already exists """ return self.putVal(self.ancs, key, val)
[docs] def setAnc(self, key, val): """ Use dgKey() Write serialized VC bytes val to key Overwrites existing val if any Returns True If val successfully written Else False """ return self.setVal(self.ancs, key, val)
[docs] def getAnc(self, key): """ Use dgKey() Return event at key Returns None if no entry at key """ return self.getVal(self.ancs, key)
[docs] def delAnc(self, key): """ Use dgKey() Deletes value at key. Returns True If key exists in database Else False """ return self.delVal(self.ancs, key)
[docs] def putBaks(self, key, vals): """ Use dgKey() Write each entry from list of bytes prefixes to key Adds to existing backers at key if any Returns True If at least one of vals is added as dup, False otherwise Duplicates are inserted in insertion order. """ return self.putIoVals(self.baks, key, vals)
[docs] def addBak(self, key, val): """ Use dgKey() Add prefix val bytes as dup to key in db Adds to existing values at key if any Returns True If at least one of vals is added as dup, False otherwise Duplicates are inserted in insertion order. """ return self.addIoVal(self.baks, key, val)
[docs] def getBaks(self, key): """ Use dgKey() Return list of backer prefixes at key Returns empty list if no entry at key Duplicates are retrieved in insertion order. """ return self.getIoVals(self.baks, key)
[docs] def getBaksIter(self, key): """ Use dgKey() Return iterator of backer prefixes at key Raises StopIteration Error when empty Duplicates are retrieved in insertion order. """ return self.getIoValsIter(self.baks, key)
[docs] def cntBaks(self, key): """ Use dgKey() Return count of backer prefixes at key Returns zero if no entry at key """ return self.cntIoVals(self.baks, key)
[docs] def delBaks(self, key): """ Use dgKey() Deletes all values at key in db. Returns True If key exists in database Else False """ return self.delIoVals(self.baks, key)
[docs] def delBak(self, key, val): """ Use dgKey() Deletes dup val at key in db. Returns True If dup at exists in db Else False Parameters: key is bytes of key within sub db's keyspace val is dup val (does not include insertion ordering proem) """ return self.delIoVal(self.baks, key, val)
[docs] def buildProof(prefixer, seqner, diger, sigers): """ Create CESR proof attachment from the quadlet of seal plus signatures on the credential Parameters: prefixer (Prefixer) Identifier of the issuer of the credential seqner (Seqner) is the sequence number of the event used to sign the credential diger (Diger) is the digest of the event used to sign the credential sigers (list) are the cryptographic signatures on the credential """ prf = bytearray() prf.extend(coring.Counter(coring.CtrDex.TransIdxSigGroups, count=1).qb64b) prf.extend(prefixer.qb64b) prf.extend(seqner.qb64b) prf.extend(diger.qb64b) prf.extend(coring.Counter(code=coring.CtrDex.ControllerIdxSigs, count=len(sigers)).qb64b) for siger in sigers: prf.extend(siger.qb64b) return prf
[docs] def messagize(creder, proof): """ Create a CESR message format with proof attachment for credential Parameters creder (Creder): instance of credential proof (str): CESR proof attachment Returns: bytearray: serialized credential with attached proof """ craw = bytearray(creder.raw) if len(proof) % 4: raise ValueError("Invalid attachments size={}, nonintegral" " quadlets.".format(len(proof))) craw.extend(coring.Counter(code=coring.CtrDex.AttachedMaterialQuadlets, count=(len(proof) // 4)).qb64b) craw.extend(proof) return craw