Source code for keri.app.keeping

# -*- encoding: utf-8 -*-
"""
KERI
keri.app.keeping module

Terminology:
    salt is 128 bit 16 char random bytes used as root entropy to derive seed or secret
    private key same as seed or secret for key pair
    seed or secret or private key is crypto suite length dependent random bytes
    public key

txn.put(
            did.encode(),
            json.dumps(certifiable_data).encode("utf-8")
        )
raw_data = txn.get(did.encode())
    if raw_data is None:
        return None
    return json.loads(raw_data)

ked = json.loads(raw[:size].decode("utf-8"))
raw = json.dumps(ked, separators=(",", ":"), ensure_ascii=False).encode("utf-8")

"""
import math
from collections import namedtuple, deque
from dataclasses import dataclass, asdict, field

import pysodium
from hio.base import doing

from .. import kering
from ..core import coring
from ..db import dbing, subing, koming
from ..help import helping

Algoage = namedtuple("Algoage", 'randy salty group extern')
Algos = Algoage(randy='randy', salty='salty', group="group", extern="extern")  # randy is rerandomize, salty is use salt


[docs] @dataclass() class PubLot: """ Public key list with indexes and datetime created Attributes: pubs (list): list of fully qualified Base64 public keys. Defaults to empty. ridx (int): rotation index of set of public keys at establishment event. Includes of key set at inception event is 0. kidx (int): key index of starting key in key set in sequence wrt to all public keys. Example if each set has 3 keys then ridx 2 has kidx of 2*3 = 6. dt (str): datetime in ISO8601 format of when key set was first created """ pubs: list = field(default_factory=list) # list qb64 public keys. ridx: int = 0 # index of rotation (est event) that uses public key set kidx: int = 0 # index of key in sequence of public keys dt: str = "" # datetime ISO8601 when key set created def __iter__(self): return iter(asdict(self))
[docs] @dataclass() class PreSit: """ Prefix's public key situation (sets of public kets) """ old: PubLot = field(default_factory=PubLot) # previous publot new: PubLot = field(default_factory=PubLot) # newly current publot nxt: PubLot = field(default_factory=PubLot) # next public publot def __iter__(self): return iter(asdict(self))
[docs] @dataclass() class PrePrm: """ Prefix's parameters for creating new key pairs """ pidx: int = 0 # prefix index for this keypair sequence algo: str = Algos.salty # salty default uses indices and salt to create new key pairs salt: str = '' # empty salt used for salty algo. stem: str = '' # default unique path stem for salty algo tier: str = '' # security tier for stretch index salty algo def __iter__(self): return iter(asdict(self))
[docs] @dataclass() class PubSet: """ Prefix's public key set (list) at rotation index ridx """ pubs: list = field(default_factory=list) # list qb64 public keys. def __iter__(self): return iter(asdict(self))
[docs] def riKey(pre, ri): """ Returns bytes DB key from concatenation with '.' of qualified Base64 prefix bytes pre and int ri (rotation index) of key rotation. Inception has ri == 0 """ if hasattr(pre, "encode"): pre = pre.encode("utf-8") # convert str to bytes return (b'%s.%032x' % (pre, ri))
[docs] def openKS(name="test", **kwa): """ Returns contextmanager generated by openLMDB but with Keeper instance as KeyStore default name="test" default temp=True, openLMDB Parameters: cls is Class instance of subclass instance name is str name of LMDBer dirPath so can have multiple databasers at different directory path names thar each use different name temp is Boolean, True means open in temporary directory, clear on close Otherwise open in persistent directory, do not clear on close """ return dbing.openLMDB(cls=Keeper, name=name, **kwa)
[docs] class Keeper(dbing.LMDBer): """ Keeper sets up named sub databases for key pair storage (KS). Methods provide key pair creation, storage, and data signing. Attributes: (inherited) name (str): unique path component used in directory or file path name base (str): another unique path component inserted before name temp (bool): True means use /tmp directory headDirPath is head directory path path is full directory path perm is numeric os permissions for directory and/or file(s) filed (bool): True means .path ends in file. False means .path ends in directory mode (str): file open mode if filed fext (str): file extension if filed file (File) opened is Boolean, True means directory created and if file then file is opened. False otherwise env (lmdb.env): LMDB main (super) database environment readonly (bool): True means open LMDB env as readonly Attributes: gbls (subing.Suber): named sub DB whose values are global parameters for all prefixes Key is parameter labels Value is parameter parameters: aeid (bytes): fully qualified qb64 non-transferable identifier prefix for authentication via signing and asymmetric encryption of secrets using the associated (public, private) key pair. Secrets include both salts and private keys for all key sets in keeper. Defaults to empty which means no authentication or encryption of key sets. pidx (bytes): hex index of next prefix key-pair sequence to be incepted algo (str): default root algorithm for generating key pair salt (bytes): root salt for generating key pairs tier (bytes): default root security tier for root salt pris (subing.CryptSignerSuber): named sub DB whose keys are public key from key pair and values are private keys from key pair Key is public key (fully qualified qb64) Value is private key (fully qualified qb64) pres (subing.CesrSuber): named sub DB whose values are prefixes or first public keys Key is first public key in key sequence for a prefix (fully qualified qb64) Value is prefix or first public key (temporary) (fully qualified qb64 prms (koming.Komer): named sub DB whose values are serialized dicts of PrePrm instance Key is identifier prefix (fully qualified qb64) Value is serialized parameter dict of public key parameters { pidx: , algo: , salt: , stem: , tier: , } sits (koming.Komer): named sub DB whose values are serialized dicts of PreSit instance Key is identifier prefix (fully qualified qb64) Value is serialized parameter dict of public key situation { old: { pubs: ridx:, kidx, dt:}, new: { pubs: ridx:, kidx:, dt:}, nxt: { pubs: ridx:, kidx:, dt:} } .pubs (koming.Komer): named sub DB whose values are serialized lists of public keys Enables lookup of public keys from prefix and ridx for replay of public keys by prefix in establishment event order. Key is prefix.ridx (rotation index as 32 char hex string) use riKey(pre, ri) Value is serialized list of fully qualified public keys that are the current signing keys after the rotation given by rotation index Properties: """ TailDirPath = "keri/ks" AltTailDirPath = ".keri/ks" TempPrefix = "keri_ks_" MaxNamedDBs = 10
[docs] def __init__(self, headDirPath=None, perm=None, reopen=False, **kwa): """ Setup named sub databases. Inherited Parameters: name is str directory path name differentiator for main database When system employs more than one keri database, name allows differentiating each instance by name default name='main' temp is boolean, assign to .temp True then open in temporary directory, clear on close Othewise then open persistent directory, do not clear on close default temp=False headDirPath is optional str head directory pathname for main database If not provided use default .HeadDirpath default headDirPath=None so uses self.HeadDirPath perm is numeric optional os dir permissions mode default perm=None so do not set mode reopen is boolean, IF True then database will be reopened by this init default reopen=True Notes: dupsort=True for sub DB means allow unique (key,pair) duplicates at a key. Duplicate means that is more than one value at a key but not a redundant copies a (key,value) pair per key. In other words the pair (key,value) must be unique both key and value in combination. Attempting to put the same (key,value) pair a second time does not add another copy. Duplicates are inserted in lexocographic order by value, insertion order. """ if perm is None: perm = self.Perm # defaults to restricted permissions for non temp super(Keeper, self).__init__(headDirPath=headDirPath, perm=perm, reopen=reopen, **kwa)
[docs] def reopen(self, **kwa): """ Open sub databases """ opened = super(Keeper, self).reopen(**kwa) # Create by opening first time named sub DBs within main DB instance # Names end with "." as sub DB name must include a non Base64 character # to avoid namespace collisions with Base64 identifier prefixes. self.gbls = subing.Suber(db=self, subkey='gbls.') self.pris = subing.CryptSignerSuber(db=self, subkey='pris.') self.prxs = subing.CesrSuber(db=self, subkey='prxs.', klas=coring.Cipher) self.nxts = subing.CesrSuber(db=self, subkey='nxts.', klas=coring.Cipher) self.smids = subing.CatCesrIoSetSuber(db=self, subkey='smids.', klas=(coring.Prefixer, coring.Seqner)) self.rmids = subing.CatCesrIoSetSuber(db=self, subkey='rmids.', klas=(coring.Prefixer, coring.Seqner)) self.pres = subing.CesrSuber(db=self, subkey='pres.', klas=coring.Prefixer) self.prms = koming.Komer(db=self, subkey='prms.', schema=PrePrm,) # New Prefix Parameters self.sits = koming.Komer(db=self, subkey='sits.', schema=PreSit,) # Prefix Situation self.pubs = koming.Komer(db=self, subkey='pubs.', schema=PubSet,) # public key set at pre.ridx return self.opened
[docs] class KeeperDoer(doing.Doer): """ Basic Keeper Doer ( LMDB Database ) Inherited Attributes: .done is Boolean completion state: True means completed Otherwise incomplete. Incompletion maybe due to close or abort. Attributes: .keeper is Keeper or LMDBer subclass Inherited Properties: .tyme is float relative cycle time of associated Tymist .tyme obtained via injected .tymth function wrapper closure. .tymth is function wrapper closure returned by Tymist .tymeth() method. When .tymth is called it returns associated Tymist .tyme. .tymth provides injected dependency on Tymist tyme base. .tock is float, desired time in seconds between runs or until next run, non negative, zero means run asap Properties: Methods: .wind injects ._tymth dependency from associated Tymist to get its .tyme .__call__ makes instance callable Appears as generator function that returns generator .do is generator method that returns generator .enter is enter context action method .recur is recur context action method or generator method .exit is exit context method .close is close context method .abort is abort context method Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, keeper, **kwa): """ Parameters: keeper is Keeper instance """ super(KeeperDoer, self).__init__(**kwa) self.keeper = keeper
def enter(self): """""" if not self.keeper.opened: self.keeper.reopen() def exit(self): """""" self.keeper.close(clear=self.keeper.temp)
[docs] class Creator: """ Class for creating a key pair based on algorithm. Attributes: Properties: Methods: .create is method to create key pair Hidden: """
[docs] def __init__(self, **kwa): """ Setup Creator. Parameters: """
[docs] def create(self, **kwa): """ Returns tuple of signers one per key pair """ return []
@property def salt(self): """ salt property getter """ return '' @property def stem(self): """ stem property getter """ return '' @property def tier(self): """ tier property getter """ return ''
[docs] class RandyCreator(Creator): """ Class for creating a key pair based on re-randomizing each seed algorithm. Attributes: Properties: Methods: .create is method to create key pair Hidden: """
[docs] def __init__(self, **kwa): """ Setup Creator. Parameters: """ super(RandyCreator, self).__init__(**kwa)
[docs] def create(self, codes=None, count=1, code=coring.MtrDex.Ed25519_Seed, transferable=True, **kwa): """ Returns list of signers one per kidx in kidxs Parameters: codes is list of derivation codes one per key pair to create count is count of key pairs to create is codes not provided code is derivation code to use for count key pairs if codes not provided transferable is Boolean, True means use trans deriv code. Otherwise nontrans """ signers = [] if not codes: # if not codes make list len count of same code codes = [code for i in range(count)] for code in codes: signers.append(coring.Signer(code=code, transferable=transferable)) return signers
[docs] class SaltyCreator(Creator): """ Class for creating a key pair based on random salt plus path stretch algorithm. Attributes: .salter is salter instance Properties: Methods: .create is method to create key pair Hidden: ._salter holds instance for .salter property """
[docs] def __init__(self, salt=None, stem=None, tier=None, **kwa): """ Setup Creator. Parameters: salt is unique salt from which to derive private key stem is path modifier used with salt to derive private keys. if stem is None then uses pidx tier is derivation criticality that determines how much hashing to use. """ super(SaltyCreator, self).__init__(**kwa) self.salter = coring.Salter(qb64=salt, tier=tier) self._stem = stem if stem is not None else ''
@property def salt(self): """ salt property getter """ return self.salter.qb64 @property def stem(self): """ stem property getter """ return self._stem @property def tier(self): """ tier property getter """ return self.salter.tier
[docs] def create(self, codes=None, count=1, code=coring.MtrDex.Ed25519_Seed, pidx=0, ridx=0, kidx=0, transferable=True, temp=False, **kwa): """ Returns list of signers one per kidx in kidxs Parameters: codes is list of derivation codes one per key pair to create count is count of key pairs to create is codes not provided code is derivation code to use for count key pairs if codes not provided pidx is int prefix index for key pair sequence ridx is int rotation index for key pair set kidx is int starting key index for key pair set transferable is Boolean, True means use trans deriv code. Otherwise nontrans temp is Boolean True means use temp stretch otherwise use time set by tier for streching """ signers = [] if not codes: # if not codes make list len count of same code codes = [code for i in range(count)] stem = self.stem if self.stem else "{:x}".format(pidx) # if not stem use pidx for i, code in enumerate(codes): path = "{}{:x}{:x}".format(stem, ridx, kidx + i) signers.append(self.salter.signer(path=path, code=code, transferable=transferable, tier=self.tier, temp=temp)) return signers
[docs] class Creatory: """ Factory class for creating Creator subclasses to create key pairs based on the provided algorithm. Usage: creator = Creatory(algo='salty').make(salt=b'0123456789abcdef') Attributes: Properties: Methods: .create is method to create key pair Hidden: ._create is method reference set to one of algorithm methods ._novelCreate ._indexCreate """
[docs] def __init__(self, algo=Algos.salty): """ Setup Creator. Parameters: algo is str code for algorithm """ if algo == Algos.randy: self._make = self._makeRandy elif algo == Algos.salty: self._make = self._makeSalty else: raise ValueError("Unsupported creation algorithm ={}.".format(algo))
[docs] def make(self, **kwa): """ Returns Creator subclass based on inited algo """ return (self._make(**kwa))
def _makeRandy(self, **kwa): """ """ return RandyCreator(**kwa) def _makeSalty(self, **kwa): """ """ return SaltyCreator(**kwa)
# default values to init manager's globals database Initage = namedtuple("Initage", 'aeid pidx salt tier')
[docs] class Manager: """Manages key pairs creation, storage, and signing Class for managing key pair creation, storage, retrieval, and message signing. Attributes: ks (Keeper): key store LMDB database instance for storing public and private keys encrypter (coring.Encrypter): instance for encrypting secrets. Public encryption key is derived from aeid (public signing key) decrypter (coring.Decrypter): instance for decrypting secrets. Private decryption key is derived seed (private signing key seed) inited (bool): True means fully initialized wrt database. False means not yet fully initialized Attributes (Hidden): _seed (str): qb64 private-signing key (seed) for the aeid from which the private decryption key is derived. If aeid stored in database is not empty then seed may required to do any key management operations. The seed value is memory only and MUST NOT be persisted to the database for the manager with which it is used. It MUST only be loaded once when the process that runs the Manager is initialized. Its presence acts as an authentication, authorization, and decryption secret for the Manager and must be stored on another device from the device that runs the Manager. Properties: aeid (str): authentication and encryption fully qualified qb64 non-transferable identifier prefix for authentication via signing and asymmetric encryption of secrets using the associated (public, private) key pair. Secrets include both salts and private keys for all key sets in keeper. Defaults to empty which means no authentication or encryption of key sets. Use initial attribute because keeper may not be open on init. pidx (int): pidx prefix index. Use initial attribute because keeper may not be open on init. Each sequence gets own pidx. Enables unique recreatable salting of key sequence based on pidx. salt (str): qb64 of root salt. Makes random root salt if not provided initial salt. Use inital attribute because keeper may not be open on init. tier (str): initial security tier as value of Tierage. Use initial attribute because keeper may not be open on init Methods: """
[docs] def __init__(self, *, ks=None, seed=None, **kwa): """ Setup Manager. Parameters: ks (Keeper): key store instance (LMDB) seed (str): qb64 private-signing key (seed) for the aeid from which the private decryption key may be derived. If aeid stored in database is not empty then seed may required to do any key management operations. The seed value is memory only and MUST NOT be persisted to the database for the manager with which it is used. It MUST only be loaded once when the process that runs the Manager is initialized. Its presence acts as an authentication, authorization, and decryption secret for the Manager and must be stored on another device from the device that runs the Manager. Currently only code MtrDex.Ed25519_Seed is supported. Parameters: Passthrough to .setup for later initialization aeid (str): qb64 of non-transferable identifier prefix for authentication and encryption of secrets in keeper. If provided aeid (not None) and different from aeid stored in database then all secrets are re-encrypted using new aeid. In this case the provided seed must not be empty. A change in aeid should require a second authentication mechanism besides the seed. pidx (int): index of next new created key pair sequence bound to a given identifier prefix. Each sequence gets own pidx. Enables unique recreatable salting of key sequence based on pidx. salt (str): qb64 of root salt. Makes random root salt if not provided tier (str): default security tier (Tierage) for root salt """ self.ks = ks if ks is not None else Keeper(reopen=True) self.encrypter = None self.decrypter = None self._seed = seed if seed is not None else "" self.inited = False # save keyword arg parameters to init later if db not opened yet self._inits = kwa if self.ks.opened: # allows keeper db to opened asynchronously self.setup(**self._inits) # first call to .setup with initialize database
[docs] def setup(self, aeid=None, pidx=None, algo=None, salt=None, tier=None): """ Setups manager root or global attributes and properties Assumes that .keeper db is open. If .keeper.gbls sub database has not been initialized for the first time then initializes from ._inits. This allows dependency injection of keepr db into manager instance prior to keeper db being opened to accomodate asynchronous process setup of db resources. Putting the db initialization here enables asynchronous opening of keeper db after keeper instance is instantiated. First call to .setup will initialize keeper db defaults if never before initialized (vacuous initialization). Parameters: aeid (str): qb64 of non-transferable identifier prefix for authentication and encryption of secrets in keeper. If provided aeid (not None) and different from aeid stored in database then all secrets are re-encrypted using new aeid. In this case the provided seed must not be empty. A change in aeid should require a second authentication mechanism besides the secret. aeid same as current aeid no change innocuous aeid different but empty which unencrypts and removes aeid aeid different not empty which reencrypts and updates aeid pidx (int): index of next new created key pair sequence for given identifier prefix algo (str): root algorithm (randy or salty) for creating key pairs salt (str): qb64 of root salt. Makes random root salt if not provided tier (str): default security tier (Tierage) for root salt """ if not self.ks.opened: raise kering.ClosedError("Attempt to setup Manager closed keystore" " database .ks.") if aeid is None: aeid = '' if pidx is None: pidx = 0 if algo is None: algo = Algos.salty if salt is None: salt = coring.Salter().qb64 else: if coring.Salter(qb64=salt).qb64 != salt: raise ValueError(f"Invalid qb64 for salt={salt}.") if tier is None: tier = coring.Tiers.low # update database if never before initialized if self.pidx is None: # never before initialized self.pidx = pidx # init to default if self.algo is None: # never before initialized self.algo = algo if self.salt is None: # never before initialized self.salt = salt if self.tier is None: # never before initialized self.tier = tier # init to default # must do this after salt is initialized so gets re-encrypted correctly if not self.aeid: # never before initialized self.updateAeid(aeid, self.seed) else: self.encrypter = coring.Encrypter(verkey=self.aeid) # derive encrypter from aeid if not self.seed or not self.encrypter.verifySeed(self.seed): raise kering.AuthError("Last seed missing or provided last seed " "not associated with last aeid={}." "".format(self.aeid)) self.decrypter = coring.Decrypter(seed=self.seed) self.inited = True
[docs] def updateAeid(self, aeid, seed): """ Given seed belongs to aeid and encrypter, update aeid and re-encrypt all secrets Parameters: aeid (Optional(str)): qb64 of new auth encrypt id (public signing key) aeid may match current aeid no change innocuous aeid may be empty which unencrypts and removes aeid aeid may be different not empty which reencrypts seed (str): qb64 of new seed from which new aeid is derived (private signing key seed) """ if self.aeid: # check that last current seed matches last current .aeid # verifies seed belongs to aeid if not self.seed or not self.encrypter.verifySeed(self.seed): raise kering.AuthError("Last seed missing or provided last seed " "not associated with last aeid={}." "".format(self.aeid)) if aeid: # aeid provided if aeid != self.aeid: # changing to a new aeid so update .encrypter self.encrypter = coring.Encrypter(verkey=aeid) # derive encrypter from aeid # verifies new seed belongs to new aeid if not seed or not self.encrypter.verifySeed(seed): raise kering.AuthError("Seed missing or provided seed not associated" " with provided aeid={}.".format(aeid)) else: # changing to empty aeid so new encrypter is None self.encrypter = None # fetch all secrets from db, decrypt all secrets with self.decrypter # unless they decrypt automatically on fetch and then re-encrypt with # encrypter update db with re-encrypted values # re-encypt root salt secret, .salt property is automatically decrypted on fetch if (salt := self.salt) is not None: # decrypted salt self.salt = salt # self.salt = self.encrypter.encrypt(ser=salt).qb64 if self.encrypter else salt # other secrets if self.decrypter: # re-encrypt root salt secrets by prefix parameters .prms for keys, data in self.ks.prms.getItemIter(): # keys is tuple of pre qb64 if data.salt: salter = self.decrypter.decrypt(ser=data.salt) data.salt = (self.encrypter.encrypt(matter=salter).qb64 if self.encrypter else salter.qb64) self.ks.prms.pin(keys, val=data) # private signing key seeds # keys is tuple == (verkey.qb64,) .pris database auto decrypts for keys, signer in self.ks.pris.getItemIter(decrypter=self.decrypter): self.ks.pris.pin(keys, signer, encrypter=self.encrypter) self.ks.gbls.pin("aeid", aeid) # set aeid in db self._seed = seed # set .seed in memory # update .decrypter self.decrypter = coring.Decrypter(seed=seed) if seed else None
@property def seed(self): """ seed property getter from ._seed. seed (str): qb64 from which aeid is derived """ return self._seed @property def aeid(self): """ aeid property getter from key store db. Assumes db initialized. aeid is qb64 auth encrypt id prefix """ return self.ks.gbls.get('aeid') @property def pidx(self): """ pidx property getter from key store db. Assumes db initialized. pidx is prefix index int for next new key sequence """ if (pidx := self.ks.gbls.get("pidx")) is not None: return int(pidx, 16) return pidx # None @pidx.setter def pidx(self, pidx): """ pidx property setter to key store db. pidx is prefix index int for next new key sequence """ self.ks.gbls.pin("pidx", "%x" % pidx) @property def algo(self): """ also property getter from key store db. Assumes db initialized. algo is default root algorithm for creating key pairs """ return self.ks.gbls.get('algo') @algo.setter def algo(self, algo): """ algo property setter to key store db. algo is default root algorithm for creating key pairs """ self.ks.gbls.pin('algo', algo) @property def salt(self): """ salt property getter from key store db. Assumes db initialized. salt is default root salt for new key sequence creation """ salt = self.ks.gbls.get('salt') if self.decrypter: # given .decrypt secret salt must be encrypted in db return self.decrypter.decrypt(ser=salt).qb64 return salt @salt.setter def salt(self, salt): """ salt property setter to key store db. Parameters: salt (str): qb64 default root salt for new key sequence creation may be plain text or cipher text handled by updateAeid """ if self.encrypter: salt = self.encrypter.encrypt(ser=salt).qb64 self.ks.gbls.pin('salt', salt) @property def tier(self): """ tier property getter from key store db. Assumes db initialized. tier is default root security tier for new key sequence creation """ return self.ks.gbls.get('tier') @tier.setter def tier(self, tier): """ tier property setter to key store db. tier is default root security tier for new key sequence creation """ self.ks.gbls.pin('tier', tier)
[docs] def incept(self, icodes=None, icount=1, icode=coring.MtrDex.Ed25519_Seed, ncodes=None, ncount=1, ncode=coring.MtrDex.Ed25519_Seed, dcode=coring.MtrDex.Blake3_256, algo=None, salt=None, stem=None, tier=None, rooted=True, transferable=True, temp=False): """ Returns tuple (verfers, digers) for inception event where verfers is list of current public key verfers public key is verfer.qb64 digers is list of next public key digers digest to xor is diger.raw Incept a prefix. Use first public key as temporary prefix. Must .repre later to move pubsit dict to correct permanent prefix. Store the dictified PreSit in the keeper under the first public key Parameters: icodes is list of private key derivation codes qb64 str one per incepting key pair icount is int count of incepting public keys when icodes not provided icode is str derivation code qb64 of all icount incepting private keys when icodes list not provided ncodes is list of private key derivation codes qb64 str one per next key pair ncount is int count of next public keys when ncodes not provided ncode is str derivation code qb64 of all ncount next public keys when ncodes not provided dcode is str derivation code qb64 of next digers. Default is MtrDex.Blake3_256 algo is str key creation algorithm code salt is str qb64 salt for randomization when salty algorithm used stem is path modifier used with salt to derive private keys when using salty agorithms. if stem is None then uses pidx tier is str security criticality tier code when using salty algorithm rooted is Boolean true means derive incept salt from root salt when incept salt not provided. Otherwise use incept salt only transferable is Boolean, True means each public key uses transferable derivation code. Default is transferable. Special case is non-transferable Use case for incept to use transferable = False is for basic derivation of non-transferable identifier prefix. When the derivation process of the identifier prefix is transferable then one should not use non-transferable for the associated public key(s). temp is Boolean. True is temporary for testing. It modifies tier of salty algorithm When both ncodes is empty and ncount is 0 then the nxt is null and will not be rotatable. This makes the identifier non-transferable in effect even when the identifier prefix is transferable. """ # get root defaults to initialize key sequence if rooted and algo is None: # use root algo from db as default algo = self.algo if rooted and salt is None: # use root salt from db instead of random salt salt = self.salt if rooted and tier is None: # use root tier from db as default tier = self.tier pidx = self.pidx # get next pidx ridx = 0 # rotation index kidx = 0 # key pair index creator = Creatory(algo=algo).make(salt=salt, stem=stem, tier=tier) if not icodes: # all same code, make list of len icount of same code if icount <= 0: raise ValueError("Invalid icount={} must be > 0.".format(icount)) icodes = [icode for i in range(icount)] isigners = creator.create(codes=icodes, pidx=pidx, ridx=ridx, kidx=kidx, transferable=transferable, temp=temp) verfers = [signer.verfer for signer in isigners] if not ncodes: # all same code, make list of len ncount of same code if ncount < 0: # next may be zero if non-trans raise ValueError("Invalid ncount={} must be >= 0.".format(ncount)) ncodes = [ncode for i in range(ncount)] # count set to 0 to ensure does not create signers if ncodes is empty nsigners = creator.create(codes=ncodes, count=0, pidx=pidx, ridx=ridx+1, kidx=kidx+len(icodes), transferable=transferable, temp=temp) digers = [coring.Diger(ser=signer.verfer.qb64b, code=dcode) for signer in nsigners] # Secret to encrypt here pp = PrePrm(pidx=pidx, algo=algo, stem=creator.stem, tier=creator.tier) if creator.salt: pp.salt = (creator.salt if not self.encrypter else self.encrypter.encrypt(ser=creator.salt).qb64) dt = helping.nowIso8601() ps = PreSit( new=PubLot(pubs=[verfer.qb64 for verfer in verfers], ridx=ridx, kidx=kidx, dt=dt), nxt=PubLot(pubs=[signer.verfer.qb64 for signer in nsigners], ridx=ridx+1, kidx=kidx+len(icodes), dt=dt)) pre = verfers[0].qb64b if not self.ks.pres.put(pre, val=coring.Prefixer(qb64=pre)): raise ValueError("Already incepted pre={}.".format(pre.decode("utf-8"))) if not self.ks.prms.put(pre, val=pp): raise ValueError("Already incepted prm for pre={}.".format(pre.decode("utf-8"))) self.pidx = pidx + 1 # increment for next inception if not self.ks.sits.put(pre, val=ps): raise ValueError("Already incepted sit for pre={}.".format(pre.decode("utf-8"))) for signer in isigners: # store secrets (private key val keyed by public key) self.ks.pris.put(keys=signer.verfer.qb64b, val=signer, encrypter=self.encrypter) self.ks.pubs.put(riKey(pre, ri=ridx), val=PubSet(pubs=ps.new.pubs)) for signer in nsigners: # store secrets (private key val keyed by public key) self.ks.pris.put(keys=signer.verfer.qb64b, val=signer, encrypter=self.encrypter) # store publics keys for lookup of private key for replay self.ks.pubs.put(riKey(pre, ri=ridx+1), val=PubSet(pubs=ps.nxt.pubs)) return (verfers, digers)
[docs] def move(self, old, new): """ Assigns new pre to old default .pres at old Moves PrePrm and PreSit dicts in keeper db from old default pre to new pre db key The new pre is the newly derived prefix which may only be known some time after the original creation of the associated key pairs. Paraameters: old is str for old prefix of pubsit dict in keeper db new is str for new prefix to move pubsit dict to in keeper db """ if old == new: return if self.ks.pres.get(old) is None: raise ValueError("Nonexistent old pre={}, nothing to assign.".format(old)) if self.ks.pres.get(new) is not None: raise ValueError("Preexistent new pre={} may not clobber.".format(new)) if (oldprm := self.ks.prms.get(old)) is None: raise ValueError("Nonexistent old prm for pre={}, nothing to move.".format(old)) if self.ks.prms.get(new) is not None: raise ValueError("Preexistent new prm for pre={} may not clobber.".format(new)) if (oldsit := self.ks.sits.get(old)) is None: raise ValueError("Nonexistent old sit for pre={}, nothing to move.".format(old)) if self.ks.sits.get(new) is not None: raise ValueError("Preexistent new sit for pre={} may not clobber.".format(new)) if not self.ks.prms.put(new, val=oldprm): raise ValueError("Failed moving prm from old pre={} to new pre={}.".format(old, new)) else: self.ks.prms.rem(old) if not self.ks.sits.put(new, val=oldsit): raise ValueError("Failed moving sit from old pre={} to new pre={}.".format(old, new)) else: self.ks.sits.rem(old) # move .pubs entries if any i = 0 while (pl := self.ks.pubs.get(riKey(old, i))): if not self.ks.pubs.put(riKey(new, i), val=pl): raise ValueError("Failed moving pubs at pre={} ri={} to new" " pre={}".format(old, i, new)) i += 1 # assign old if not self.ks.pres.pin(old, val=coring.Prefixer(qb64=new)): raise ValueError("Failed assiging new pre={} to old pre={}.".format(new, old)) # make new so that if move again we reserve each one if not self.ks.pres.put(new, val=coring.Prefixer(qb64=new)): raise ValueError("Failed assiging new pre={}.".format(new))
[docs] def rotate(self, pre, ncodes=None, ncount=1, ncode=coring.MtrDex.Ed25519_Seed, dcode=coring.MtrDex.Blake3_256, transferable=True, temp=False, erase=True): """ Returns tuple (verfers, digers) for rotation event of keys for pre where verfers is list of current public key verfers public key is verfer.qb64 digers is list of next public key digers digest to xor is diger.raw Rotate a prefix. Store the updated dictified PreSit in the keeper under pre Parameters: pre (str) qb64 of prefix ncodes (list): of private key derivation codes qb64 str one per next key pair ncount (int): count of next public keys when icodes not provided ncode (str): derivation code qb64 of all ncount next public keys when ncodes not provided dcode i(str): derivation code qb64 of next key digest of digers Default is MtrDex.Blake3_256 transferable (bool): True means each public key uses transferable derivation code. Default is transferable. Special case is non-transferable Normally no use case for rotation to use transferable = False. When the derivation process of the identifier prefix is transferable then one should not use transferable = False for the associated public key(s). temp (bool): True is temporary for testing. It modifies tier of salty algorithm erase (bool): True means erase old private keys made stale by rotation When both ncodes is empty and ncount is 0 then the nxt is null and will not be rotatable. This makes the identifier non-transferable in effect even when the identifier prefix is transferable. """ # Secret to decrypt here if (pp := self.ks.prms.get(pre)) is None: raise ValueError("Attempt to rotate nonexistent pre={}.".format(pre)) if (ps := self.ks.sits.get(pre)) is None: raise ValueError("Attempt to rotate nonexistent pre={}.".format(pre)) if not ps.nxt.pubs: # empty nxt public keys so non-transferable prefix raise ValueError("Attempt to rotate nontransferable pre={}.".format(pre)) old = ps.old # save prior old so can clean out if rotate successful ps.old = ps.new # move prior new to old so save previous one step ps.new = ps.nxt # move prior nxt to new which new is now current signer verfers = [] # assign verfers from current new was prior nxt for pub in ps.new.pubs: if self.aeid and not self.decrypter: # maybe should rethink this raise kering.DecryptError("Unauthorized decryption attempt. " "Aeid but no decrypter.") if ((signer := self.ks.pris.get(pub.encode("utf-8"), decrypter=self.decrypter)) is None): raise ValueError("Missing prikey in db for pubkey={}".format(pub)) verfers.append(signer.verfer) salt = pp.salt if salt: if self.aeid: if not self.decrypter: raise kering.DecryptError("Unauthorized decryption. Aeid but no decrypter.") salt = self.decrypter.decrypt(ser=salt).qb64 else: salt = coring.Salter(qb64=salt).qb64 # ensures salt was unencrypted creator = Creatory(algo=pp.algo).make(salt=salt, stem=pp.stem, tier=pp.tier) if not ncodes: # all same code, make list of len count of same code if ncount < 0: # next may be zero if non-trans raise ValueError("Invalid count={} must be >= 0.".format(ncount)) ncodes = [ncode for i in range(ncount)] pidx = pp.pidx # get pidx for this key sequence, may be used by salty creator ridx = ps.new.ridx + 1 kidx = ps.nxt.kidx + len(ps.new.pubs) # count set to 0 to ensure does not create signers if codes is empty signers = creator.create(codes=ncodes, count=0, pidx=pidx, ridx=ridx, kidx=kidx, transferable=transferable, temp=temp) digers = [coring.Diger(ser=signer.verfer.qb64b, code=dcode) for signer in signers] dt = helping.nowIso8601() ps.nxt = PubLot(pubs=[signer.verfer.qb64 for signer in signers], ridx=ridx, kidx=kidx, dt=dt) if not self.ks.sits.pin(pre, val=ps): raise ValueError("Problem updating pubsit db for pre={}.".format(pre)) for signer in signers: # store secrets (private key val keyed by public key) self.ks.pris.put(keys=signer.verfer.qb64b, val=signer, encrypter=self.encrypter) # store public keys for lookup of private keys by public key for replay self.ks.pubs.put(riKey(pre, ri=ps.nxt.ridx), val=PubSet(pubs=ps.nxt.pubs)) if erase: for pub in old.pubs: # remove prior old prikeys not current old self.ks.pris.rem(pub) return (verfers, digers)
[docs] def sign(self, ser, pubs=None, verfers=None, indexed=True, indices=None, ondices=None, pre=None, path=None): """ Returns list of signatures of ser if indexed as Sigers else as Cigars with .verfer assigned. Parameters: ser (bytes): serialization to sign pubs (list[str] | None): of qb64 public keys to lookup private keys one of pubs or verfers is required. If both then verfers is ignored. verfers (list[Verfer] | None): Verfer instances of public keys one of pubs or verfers is required. If both then verfers is ignored. If not pubs then gets public key from verfer.qb64 indexed (bool): True means use use indexed signatures and return list of Siger instances. False means do not use indexed signatures and return list of Cigar instances When indexed True, each index is an offset that maps the offset in the coherent lists: pubs, verfers, signers (pris from keystore .ks) onto the appropriate offset into the signing keys or prior next keys lists of a key event as determined by the indices and ondices lists, or appropriate defaults when indices and/or ondices are not provided. indices (list[int] | None): indices (offsets) when indexed == True, to use for indexed signatures whose offset into the current keys or prior next list may differ from the order of appearance in the provided coherent pubs, verfers, signers lists. This allows witness indexed sigs or controller multi-sig where the parties do not share the same manager or ordering so the default ordering in pubs or verfers is wrong for the index. This sets the value of the index property of the returned Siger. When provided the length of indices must match the len of the coherent lists: pubs, verfers, signers (pris from keystore .ks) else raises ValueError. When not provided and indexed is True then use default index that is the offset into the coherent lists: pubs, verfers, signers (pris from keystore .ks) ondices (list[int | None] | None): other indices (offsets) when indexed is True for indexed signatures whose offset into the prior next list may differ from the order of appearance in the provided coherent pubs, verfers, signers lists. This allows partial rotation with reserve or custodial key management so that the index (hash of index) of the public key for the signature appears at a different index in the current key list from the prior next list. This sets the value of the ondex property of the returned Siger. When provided the length of indices must match the len of the coherent lists: pubs, verfers, signers (pris from keystore .ks) else raises ValueError. When no ondex is applicable to a given signature then the value of the entry in ondices MUST be None. When ondices is not provided then all sigers .ondex is None. pre (str | None): identity prefix (aid) of signer. Used for HDK salty algo key lookup or re-creation. Look up key path state and algo from local keystore .ks path (tuple | None): HDX randy algo signing key path tuple part of form (ridx, kidx) where ridx is the optional rotation index and kidx is the required zeroth key index of the key list. The fully HDK path is formed by looking up the stem and pidx using the pre and when provided the ridx and then computing the pidxes for each signing key. When indices is provided then the kidxes are computed by adding the index offset to the zeroth kidx. When indices is not provided then the kidxes are assumed to be all the keys in the key list computed from the local keystore by subtracting the kidx of the zeroth element of the following key list. When path is not provided then the default is all the kidxs of the key list from the current .new key info. if neither pubs or verfers provided then returns empty list of signatures If pubs then ignores verfers otherwise uses verferss Manager implement .sign method and tests sign(self,ser,pubs,indexed=True) checks for pris for pubs in db is not raises error then signs ser with eah pub returns list of sigers indexed else list of cigars if not """ signers = [] if pubs is None and verfers is None: if pre is None: raise ValueError("pubs or verfers or pre required") # logic here to generate paths # use pre to read .ks.prms and .ks.sits to get algo stem and pidx and # sits .old an .new for pre if path is None: # use provided path tuple for .new or .nxt pass # defualt path is .new.ridx and .new.kidx # compute paths # if indices provided use indices to compute kidxes # otherwise default is all the keys from the .new key list so use # .nxt to comput number of keys to generate kidxes for paths paths = [] # use paths to generate signers if pubs: for pub in pubs: if self.aeid and not self.decrypter: raise kering.DecryptError("Unauthorized decryption attempt. " "Aeid but no decrypter.") if ((signer := self.ks.pris.get(pub, decrypter=self.decrypter)) is None): raise ValueError("Missing prikey in db for pubkey={}".format(pub)) signers.append(signer) else: for verfer in verfers: if self.aeid and not self.decrypter: raise kering.DecryptError("Unauthorized decryption attempt. " "Aeid but no decrypter.") if ((signer := self.ks.pris.get(verfer.qb64, decrypter=self.decrypter)) is None): raise ValueError("Missing prikey in db for pubkey={}".format(verfer.qb64)) signers.append(signer) if indices and len(indices) != len(signers): raise ValueError(f"Mismatch indices length={len(indices)} and resultant" f" signers length={len(signers)}") if ondices and len(ondices) != len(signers): raise ValueError(f"Mismatch ondices length={len(ondices)} and resultant" f" signers length={len(signers)}") if indexed: sigers = [] for j, signer in enumerate(signers): if indices: # not the default get index from indices i = indices[j] # must be whole number if not isinstance(i, int) or i < 0: raise ValueError(f"Invalid signing index = {i}, not " f"whole number.") else: # the default i = j # same index as database if ondices: # not the default get ondex from ondices o = ondices[j] # int means both, None means current only if not (o is None or isinstance(o, int) and not isinstance(o, bool) and o >= 0): raise ValueError(f"Invalid other signing index = {o}, not " f"None or not whole number.") else: # default o = i # must both be same value int # .sign assigns .verfer of siger and sets code of siger # appropriately for single or dual indexed signatures sigers.append(signer.sign(ser, index=i, only=True if o is None else False, ondex=o)) return sigers else: cigars = [] for signer in signers: cigars.append(signer.sign(ser)) # assigns .verfer to cigar return cigars
[docs] def decrypt(self, ser, pubs=None, verfers=None): """ Returns list of signatures of ser if indexed as Sigers else as Cigars with .verfer assigned. Parameters: ser (bytes): serialization to sign pubs (list[str] | None): of qb64 public keys to lookup private keys one of pubs or verfers is required. If both then verfers is ignored. verfers (list[Verfer] | None): Verfer instances of public keys one of pubs or verfers is required. If both then verfers is ignored. If not pubs then gets public key from verfer.qb64 Returns: bytes: decrypted data """ signers = [] if pubs: for pub in pubs: if self.aeid and not self.decrypter: raise kering.DecryptError("Unauthorized decryption attempt. " "Aeid but no decrypter.") if ((signer := self.ks.pris.get(pub, decrypter=self.decrypter)) is None): raise ValueError("Missing prikey in db for pubkey={}".format(pub)) signers.append(signer) else: for verfer in verfers: if self.aeid and not self.decrypter: raise kering.DecryptError("Unauthorized decryption attempt. " "Aeid but no decrypter.") if ((signer := self.ks.pris.get(verfer.qb64, decrypter=self.decrypter)) is None): raise ValueError("Missing prikey in db for pubkey={}".format(verfer.qb64)) signers.append(signer) plain = ser for signer in signers: sigkey = signer.raw + signer.verfer.raw # sigkey is raw seed + raw verkey prikey = pysodium.crypto_sign_sk_to_box_sk(sigkey) # raw private encrypt key pubkey = pysodium.crypto_scalarmult_curve25519_base(prikey) plain = pysodium.crypto_box_seal_open(plain, pubkey, prikey) # qb64b if plain == ser: raise ValueError("unable to decrypt data") return plain
[docs] def ingest(self, secrecies, iridx=0, ncount=1, ncode=coring.MtrDex.Ed25519_Seed, dcode=coring.MtrDex.Blake3_256, algo=Algos.salty, salt=None, stem=None, tier=None, rooted=True, transferable=True, temp=False): """ Ingest secrecies as a list of lists of secrets organized in event order to register the sets of secrets of associated externally generated keypair lists into the database. Returns: ret (tuple): (ipre, veferies) where: ipre is prefix index of ingested key pairs needed to fetch later for replay veferies is list of lists of all the verfers for the public keys from the private keys in secrecies in order of appearance. Essentially ingest ends with the current keys as the last key list in secrecies and the nxt keys are newly created as if a rotation to the last set of keys was performed. Unlike rotate, however, ingest does not delete any of the private keys it ingests. This must be done separately if desired. Each list in secrecies is an ordered list of private keys corresponding to the public list in the key state for each establishment event in order. The first list are the keys for the inception event, the next list for the first rotation, and each subsequent list for the next rotation and so on. May be used for import or recovery from backup. Method parameters specify the policy for generating new keys pairs for rotations that follow the ingested list of lists. The parameters are used to define how to rotate to new key pairs that follow the ingested sequence. Parameters: secrecies (list): list of lists of fully qualified secrets (private keys) iridx (int): initial ridx at where set PubSit after ingestion enables database to store where initial replay should start from ncount (int): count of next public keys for next after end of secrecies ncode (str): derivation code qb64 of all ncount next public keys after end of secrecies dcode is str derivation code qb64 of next digers after end of secrecies Default is MtrDex.Blake3_256 algo is str key creation algorithm code for next after end of secrecies salt is str qb64 salt for randomization when salty algorithm used for next after end of secrecies stem is path modifier used with salt to derive private keys when using salty agorithms. if stem is None then uses pidx tier is str security criticality tier code when using salty algorithm rooted is Boolean true means derive incept salt from root salt when incept salt not provided. Otherwise use incept salt only transferable is Boolean, True means each public key uses transferable derivation code. Default is transferable. Special case is non-transferable Use case for incept to use transferable = False is for basic derivation of non-transferable identifier prefix. When the derivation process of the identifier prefix is transferable then one should not use non-transferable for the associated public key(s). temp is Boolean. True is temporary for testing. It modifies strech time of salty algorithm """ if iridx > len(secrecies): raise ValueError(f"Initial ridx={iridx} beyond last secrecy.") # configure parameters for creating new keys after ingested sequence if rooted and salt is None: # use root salt instead of random salt salt = self.salt if rooted and tier is None: # use root tier as default tier = self.tier pidx = self.pidx # get next pidx creator = Creatory(algo=algo).make(salt=salt, stem=stem, tier=tier) ipre = "" dt = "" # empty for incept of old pubs = [] ridx = 0 kidx = 0 verferies = [] # list of lists of verfers first = True secrecies = deque(secrecies) while secrecies: csecrets = secrecies.popleft() # current csigners = [coring.Signer(qb64=secret, transferable=transferable) for secret in csecrets] csize = len(csigners) verferies.append([signer.verfer for signer in csigners]) if first: # Secret to encrypt here pp = PrePrm(pidx=pidx, algo=algo, salt=(creator.salt if not self.encrypter else self.encrypter.encrypt(ser=creator.salt).qb64), stem=creator.stem, tier=creator.tier) pre = csigners[0].verfer.qb64b ipre = csigners[0].verfer.qb64 if not self.ks.pres.put(pre, val=coring.Prefixer(qb64=pre)): raise ValueError("Already incepted pre={}.".format(pre.decode("utf-8"))) if not self.ks.prms.put(pre, val=pp): raise ValueError("Already incepted prm for pre={}.".format(pre.decode("utf-8"))) self.pidx = pidx + 1 # increment so unique first = False for signer in csigners: # store secrets (private key val keyed by public key) self.ks.pris.put(keys=signer.verfer.qb64b, val=signer, encrypter=self.encrypter) pubs = [signer.verfer.qb64 for signer in csigners] self.ks.pubs.put(riKey(pre, ri=ridx), val=PubSet(pubs=pubs)) dt = helping.nowIso8601() if ridx == max(iridx - 1, 0): # setup ps.old at this ridx if iridx == 0: old = PubLot() # defaults ok else: osigners = csigners osith = "{:x}".format(max(1, math.ceil(len(osigners) / 2))) ost = coring.Tholder(sith=osith).sith old=PubLot(pubs=pubs, ridx=ridx, kidx=kidx, dt=dt) ps = PreSit(old=old) # .new and .nxt are default if not self.ks.sits.pin(pre, val=ps): raise ValueError("Problem updating pubsit db for pre={}.".format(pre)) if ridx == iridx: # setup ps.new at this ridx if (ps := self.ks.sits.get(pre)) is None: raise ValueError("Attempt to rotate nonexistent pre={}.".format(pre)) new=PubLot(pubs=pubs, ridx=ridx, kidx=kidx, dt=dt) ps.new = new if not self.ks.sits.pin(pre, val=ps): raise ValueError("Problem updating pubsit db for pre={}.".format(pre)) if ridx == iridx + 1: # set up ps.nxt at this ridx if (ps := self.ks.sits.get(pre)) is None: raise ValueError("Attempt to rotate nonexistent pre={}.".format(pre)) nsigners = csigners nxt=PubLot(pubs=pubs, ridx=ridx, kidx=kidx, dt=dt) ps.nxt = nxt if not self.ks.sits.pin(pre, val=ps): raise ValueError("Problem updating pubsit db for pre={}.".format(pre)) ridx += 1 # next ridx kidx += csize # next kidx # create nxt signers after ingested signers nsigners = creator.create(count=ncount, code=ncode, pidx=pidx, ridx=ridx, kidx=kidx, transferable=transferable, temp=temp) for signer in nsigners: # store secrets (private key val keyed by public key) self.ks.pris.put(keys=signer.verfer.qb64b, val=signer) pubs = [signer.verfer.qb64 for signer in nsigners] self.ks.pubs.put(riKey(pre, ri=ridx), val=PubSet(pubs=pubs)) if ridx == iridx + 1: # want to set up ps.next at this ridx dt = helping.nowIso8601() if (ps := self.ks.sits.get(pre)) is None: raise ValueError("Attempt to rotate nonexistent pre={}.".format(pre)) nxt=PubLot(pubs=pubs, ridx=ridx, kidx=kidx, dt=dt) ps.nxt = nxt if not self.ks.sits.pin(pre, val=ps): raise ValueError("Problem updating pubsit db for pre={}.".format(pre)) return (ipre, verferies) #
[docs] def replay(self, pre, dcode=coring.MtrDex.Blake3_256, advance=True, erase=True): """ Returns duple (verfers, digers) associated with public key set from the key sequence for identifier prefix pre at rotation index ridx stored in db .pubs. Inception is at ridx == 0. Enables replay of preexisting public key sequence. In returned duple: verfers is list of current public key verfers public key is verfer.qb64 digers is list of next public key digers digest to xor is diger.raw If key sequence at ridx does already exist in .pubs database for pre then raises ValueError. If preexisting pubs for pre exist but .ridx is two large for preexisting pubs then raises IndexError. Parameters: pre (str): fully qualified qb64 identifier prefix dcode (str): derivation code for digers for next xor digest. Default is MtrDex.Blake3_256 advance (bool): True means advance to next set of keys for replay erase (bool): True means erase old private keys made stale by advancement when advance is True otherwise ignore """ if (pp := self.ks.prms.get(pre)) is None: raise ValueError("Attempt to replay nonexistent pre={}.".format(pre)) if (ps := self.ks.sits.get(pre)) is None: raise ValueError("Attempt to replay nonexistent pre={}.".format(pre)) if advance: old = ps.old # save prior old so can clean out if rotate successful ps.old = ps.new # move prior new to old so save previous one step ps.new = ps.nxt # move prior nxt to new which new is now current signer ridx = ps.new.ridx kidx = ps.new.kidx csize = len(ps.new.pubs) # Usually when next keys are null then aid is effectively non-transferable # but when replaying injected keys reaching null next pub keys or # equivalently default empty is the sign that we have reached the # end of the replay so need to raise an IndexError if not (pubset := self.ks.pubs.get(riKey(pre, ridx+1))): # empty nxt public keys so end of replay raise IndexError(f"Invalid replay attempt of pre={pre} at " f"ridx={ridx}.") pubs = pubset.pubs # create nxt from pubs dt = helping.nowIso8601() nxt=PubLot(pubs=pubs, ridx=ridx+1, kidx=kidx+csize, dt=dt) ps.nxt = nxt verfers = [] # assign verfers from current new was prior nxt for pub in ps.new.pubs: if self.aeid and not self.decrypter: # maybe should rethink this raise kering.DecryptError("Unauthorized decryption attempt. " "Aeid but no decrypter.") if ((signer := self.ks.pris.get(pub.encode("utf-8"), decrypter=self.decrypter)) is None): raise ValueError("Missing prikey in db for pubkey={}".format(pub)) verfers.append(signer.verfer) digers = [coring.Diger(ser=pub.encode("utf-8"), code=dcode) for pub in ps.nxt.pubs] if advance: if not self.ks.sits.pin(pre, val=ps): raise ValueError("Problem updating pubsit db for pre={}.".format(pre)) if erase: for pub in old.pubs: # remove prior old prikeys not current old self.ks.pris.rem(pub) return (verfers, digers)
[docs] class ManagerDoer(doing.Doer): """ Basic Manager Doer to initialize keystore database .ks Inherited Attributes: .done is Boolean completion state: True means completed Otherwise incomplete. Incompletion maybe due to close or abort. Attributes: .manager is Manager subclass Inherited Properties: .tyme is float relative cycle time of associated Tymist .tyme obtained via injected .tymth function wrapper closure. .tymth is function wrapper closure returned by Tymist .tymeth() method. When .tymth is called it returns associated Tymist .tyme. .tymth provides injected dependency on Tymist tyme base. .tock is float, desired time in seconds between runs or until next run, non negative, zero means run asap Properties: Methods: .wind injects ._tymth dependency from associated Tymist to get its .tyme .__call__ makes instance callable Appears as generator function that returns generator .do is generator method that returns generator .enter is enter context action method .recur is recur context action method or generator method .exit is exit context method .close is close context method .abort is abort context method Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, manager, **kwa): """ Parameters: manager (Manager): instance """ super(ManagerDoer, self).__init__(**kwa) self.manager = manager
def enter(self): """""" if not self.manager.inited: self.manager.setup(**self.manager._inits) def exit(self): """""" pass