# -*- encoding: utf-8 -*-
"""
KERI
keri.app.keeping module
Terminology:
salt is 128 bit 16 char random bytes used as root entropy to derive seed or secret
private key same as seed or secret for key pair
seed or secret or private key is crypto suite length dependent random bytes
public key
Example usage::
txn.put(
did.encode(),
json.dumps(certifiable_data).encode("utf-8")
)
raw_data = txn.get(did.encode())
if raw_data is None:
return None
return json.loads(raw_data)
ked = json.loads(raw[:size].decode("utf-8"))
raw = json.dumps(ked, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
"""
import math
from collections import namedtuple, deque
from dataclasses import dataclass, asdict, field
import pysodium
from hio.base import doing
from ..kering import ClosedError, AuthError, DecryptError
from ..core import (Prefixer, Number, Diger, Tholder,
Cipher, Signer, Salter,
Encrypter, Decrypter, Tiers, MtrDex)
from ..db import (openLMDB, LMDBer, Suber,
CryptSignerSuber, CesrSuber,
CatCesrIoSetSuber, Komer)
from ..help import nowIso8601
Algoage = namedtuple("Algoage", 'randy salty group extern')
Algos = Algoage(randy='randy', salty='salty', group="group", extern="extern") # randy is rerandomize, salty is use salt
[docs]
@dataclass()
class PubLot:
"""
Public key list with indexes and datetime created
Attributes:
pubs (list): list of fully qualified Base64 public keys. Defaults to empty.
ridx (int): rotation index of set of public keys at establishment event.
Includes of key set at inception event is 0.
kidx (int): key index of starting key in key set in sequence wrt to all
public keys. Example if each set has 3 keys then ridx 2 has
kidx of 2*3 = 6.
dt (str): datetime in ISO8601 format of when key set was first created
"""
pubs: list = field(default_factory=list) # list qb64 public keys.
ridx: int = 0 # index of rotation (est event) that uses public key set
kidx: int = 0 # index of key in sequence of public keys
dt: str = "" # datetime ISO8601 when key set created
def __iter__(self):
return iter(asdict(self))
[docs]
@dataclass()
class PreSit:
"""
Prefix's public key situation (sets of public kets)
"""
old: PubLot = field(default_factory=PubLot) # previous publot
new: PubLot = field(default_factory=PubLot) # newly current publot
nxt: PubLot = field(default_factory=PubLot) # next public publot
def __iter__(self):
return iter(asdict(self))
[docs]
@dataclass()
class PrePrm:
"""
Prefix's parameters for creating new key pairs
"""
pidx: int = 0 # prefix index for this keypair sequence
algo: str = Algos.salty # salty default uses indices and salt to create new key pairs
salt: str = '' # empty salt used for salty algo.
stem: str = '' # default unique path stem for salty algo
tier: str = '' # security tier for stretch index salty algo
def __iter__(self):
return iter(asdict(self))
[docs]
@dataclass()
class PubSet:
"""
Prefix's public key set (list) at rotation index ridx
"""
pubs: list = field(default_factory=list) # list qb64 public keys.
def __iter__(self):
return iter(asdict(self))
[docs]
def riKey(pre, ri):
"""
Returns bytes DB key from concatenation with '.' of qualified Base64 prefix
bytes pre and int ri (rotation index) of key rotation.
Inception has ri == 0
"""
if hasattr(pre, "encode"):
pre = pre.encode("utf-8") # convert str to bytes
return (b'%s.%032x' % (pre, ri))
[docs]
def openKS(name="test", **kwa):
"""
Returns contextmanager generated by openLMDB but with Keeper instance as
KeyStore
default name="test"
default temp=True,
openLMDB Parameters:
cls is Class instance of subclass instance
name is str name of LMDBer dirPath so can have multiple databasers
at different directory path names thar each use different name
temp is Boolean, True means open in temporary directory, clear on close
Otherwise open in persistent directory, do not clear on close
"""
return openLMDB(cls=Keeper, name=name, **kwa)
[docs]
class Keeper(LMDBer):
"""
Keeper sets up named sub databases for key pair storage (KS).
Methods provide key pair creation, storage, and data signing.
Attributes: (inherited)
name (str): unique path component used in directory or file path name
base (str): another unique path component inserted before name
temp (bool): True means use /tmp directory
headDirPath is head directory path
path is full directory path
perm is numeric os permissions for directory and/or file(s)
filed (bool): True means .path ends in file. Otherwise .path ends in directory.
mode (str): file open mode if filed
fext (str): file extension if filed
file (File)
opened is Boolean, True means directory created and if file then file is opened. False otherwise.
env (lmdb.env): LMDB main (super) database environment
readonly (bool): True means open LMDB env as readonly
Attributes:
gbls (Suber): named sub DB whose values are global parameters
for all prefixes
Key is parameter labels
Value is parameter::
parameters:
aeid (bytes): fully qualified qb64 non-transferable identifier
prefix for authentication via signing and asymmetric encryption
of secrets using the associated (public, private) key pair.
Secrets include both salts and private keys for all key sets
in keeper. Defaults to empty which means no authentication
or encryption of key sets.
pidx (bytes): hex index of next prefix key-pair sequence to be incepted
algo (str): default root algorithm for generating key pair
salt (bytes): root salt for generating key pairs
tier (bytes): default root security tier for root salt
pris (CryptSignerSuber): named sub DB whose keys are public key
from key pair and values are private keys from key pair
Key is public key (fully qualified qb64)
Value is private key (fully qualified qb64)
pres (CesrSuber): named sub DB whose values are prefixes or first
public keys
Key is first public key in key sequence for a prefix (fully qualified qb64)
Value is prefix or first public key (temporary) (fully qualified qb64
prms (Komer): named sub DB whose values are serialized dicts of
PrePrm instance
Key is identifier prefix (fully qualified qb64)
Value is serialized parameter dict of public key parameters::
{
pidx: ,
algo: ,
salt: ,
stem: ,
tier: ,
}
sits (Komer): named sub DB whose values are serialized dicts of
PreSit instance
Key is identifier prefix (fully qualified qb64)
Value is serialized parameter dict of public key situation::
{
old: { pubs: ridx, kidx, dt },
new: { pubs: ridx, kidx, dt },
nxt: { pubs: ridx, kidx, dt }
}
.pubs (Komer): named sub DB whose values are serialized lists of
public keys
Enables lookup of public keys from prefix and ridx for replay of
public keys by prefix in establishment event order.
Key is prefix.ridx (rotation index as 32 char hex string)
use riKey(pre, ri)
Value is serialized list of fully qualified public keys that are the
current signing keys after the rotation given by rotation index
Properties:
"""
TailDirPath = "keri/ks"
AltTailDirPath = ".keri/ks"
TempPrefix = "keri_ks_"
MaxNamedDBs = 10
[docs]
def __init__(self, headDirPath=None, perm=None, reopen=False, **kwa):
"""
Setup named sub databases.
Inherited Parameters:
name is str directory path name differentiator for main database.
When system employs more than one keri database, name allows
differentiating each instance by name. Default name='main'.
temp is boolean, assign to .temp.
True then open in temporary directory, clear on close.
Otherwise then open persistent directory, do not clear on close.
Default temp=False.
headDirPath is optional str head directory pathname for main database.
If not provided use default .HeadDirpath. Default headDirPath=None.
perm is numeric optional os dir permissions mode. Default perm=None.
reopen is boolean, IF True then database will be reopened by this init.
Default reopen=True.
Notes:
dupsort=True for sub DB means allow unique (key,pair) duplicates at a key.
Duplicate means that is more than one value at a key but not a redundant
copies a (key,value) pair per key. In other words the pair (key,value)
must be unique both key and value in combination.
Attempting to put the same (key,value) pair a second time does
not add another copy.
Duplicates are inserted in lexocographic order by value, insertion order.
"""
if perm is None:
perm = self.Perm # defaults to restricted permissions for non temp
super(Keeper, self).__init__(headDirPath=headDirPath, perm=perm,
reopen=reopen, **kwa)
[docs]
def reopen(self, **kwa):
"""
Open sub databases
"""
opened = super(Keeper, self).reopen(**kwa)
# Create by opening first time named sub DBs within main DB instance
# Names end with "." as sub DB name must include a non Base64 character
# to avoid namespace collisions with Base64 identifier prefixes.
self.gbls = Suber(db=self, subkey='gbls.')
self.pris = CryptSignerSuber(db=self, subkey='pris.')
self.prxs = CesrSuber(db=self,
subkey='prxs.',
klas=Cipher)
self.nxts = CesrSuber(db=self,
subkey='nxts.',
klas=Cipher)
self.smids = CatCesrIoSetSuber(db=self,
subkey='smids.',
klas=(Prefixer, Number))
self.rmids = CatCesrIoSetSuber(db=self,
subkey='rmids.',
klas=(Prefixer, Number))
self.pres = CesrSuber(db=self,
subkey='pres.',
klas=Prefixer)
self.prms = Komer(db=self,
subkey='prms.',
klas=PrePrm,) # New Prefix Parameters
self.sits = Komer(db=self,
subkey='sits.',
klas=PreSit,) # Prefix Situation
self.pubs = Komer(db=self,
subkey='pubs.',
klas=PubSet,) # public key set at pre.ridx
return self.opened
[docs]
class KeeperDoer(doing.Doer):
"""
Basic Keeper Doer ( LMDB Database )
Inherited Attributes:
.done is Boolean completion state:
True means completed
Otherwise incomplete. Incompletion maybe due to close or abort.
Attributes:
.keeper is Keeper or LMDBer subclass
Inherited Properties:
.tyme is float relative cycle time of associated Tymist .tyme obtained
via injected .tymth function wrapper closure.
.tymth is function wrapper closure returned by Tymist .tymeth() method.
When .tymth is called it returns associated Tymist .tyme.
.tymth provides injected dependency on Tymist tyme base.
.tock is float, desired time in seconds between runs or until next run,
non negative, zero means run asap
Properties:
Methods:
.wind injects ._tymth dependency from associated Tymist to get its .tyme
.__call__ makes instance callable
Appears as generator function that returns generator
.do is generator method that returns generator
.enter is enter context action method
.recur is recur context action method or generator method
.exit is exit context method
.close is close context method
.abort is abort context method
Hidden:
._tymth is injected function wrapper closure returned by .tymen() of
associated Tymist instance that returns Tymist .tyme. when called.
._tock is hidden attribute for .tock property
"""
[docs]
def __init__(self, keeper, **kwa):
"""
Parameters:
keeper is Keeper instance
"""
super(KeeperDoer, self).__init__(**kwa)
self.keeper = keeper
def enter(self, *, temp=None):
""""""
if not self.keeper.opened:
self.keeper.reopen() # reopen(temp=temp)
def exit(self):
""""""
self.keeper.close(clear=self.keeper.temp)
[docs]
class Creator:
"""
Class for creating a key pair based on algorithm.
Attributes:
Properties:
Methods:
.create is method to create key pair
Hidden:
"""
[docs]
def __init__(self, **kwa):
"""
Setup Creator.
Parameters:
"""
[docs]
def create(self, **kwa):
"""
Returns tuple of signers one per key pair
"""
return []
@property
def salt(self):
"""
salt property getter
"""
return ''
@property
def stem(self):
"""
stem property getter
"""
return ''
@property
def tier(self):
"""
tier property getter
"""
return ''
[docs]
class RandyCreator(Creator):
"""
Class for creating a key pair based on re-randomizing each seed algorithm.
Attributes:
Properties:
Methods:
.create is method to create key pair
Hidden:
"""
[docs]
def __init__(self, **kwa):
"""
Setup Creator.
Parameters:
"""
super(RandyCreator, self).__init__(**kwa)
[docs]
def create(self, codes=None, count=1, code=MtrDex.Ed25519_Seed,
transferable=True, **kwa):
"""
Returns list of signers one per kidx in kidxs
Parameters:
codes is list of derivation codes one per key pair to create
count is count of key pairs to create is codes not provided
code is derivation code to use for count key pairs if codes not provided
transferable is Boolean, True means use trans deriv code. Otherwise nontrans
"""
signers = []
if not codes: # if not codes make list len count of same code
codes = [code for i in range(count)]
for code in codes:
signers.append(Signer(code=code, transferable=transferable))
return signers
[docs]
class SaltyCreator(Creator):
"""
Class for creating a key pair based on random salt plus path stretch algorithm.
Attributes:
.salter is salter instance
Properties:
Methods:
.create is method to create key pair
Hidden:
._salter holds instance for .salter property
"""
[docs]
def __init__(self, salt=None, stem=None, tier=None, **kwa):
"""
Setup Creator.
Parameters:
salt is unique salt from which to derive private key
stem is path modifier used with salt to derive private keys.
if stem is None then uses pidx
tier is derivation criticality that determines how much hashing to use.
"""
super(SaltyCreator, self).__init__(**kwa)
self.salter = Salter(qb64=salt, tier=tier)
self._stem = stem if stem is not None else ''
@property
def salt(self):
"""
salt property getter
"""
return self.salter.qb64
@property
def stem(self):
"""
stem property getter
"""
return self._stem
@property
def tier(self):
"""
tier property getter
"""
return self.salter.tier
[docs]
def create(self, codes=None, count=1, code=MtrDex.Ed25519_Seed,
pidx=0, ridx=0, kidx=0, transferable=True, temp=False, **kwa):
"""
Returns list of signers one per kidx in kidxs
Parameters:
codes is list of derivation codes one per key pair to create
count is count of key pairs to create is codes not provided
code is derivation code to use for count key pairs if codes not provided
pidx is int prefix index for key pair sequence
ridx is int rotation index for key pair set
kidx is int starting key index for key pair set
transferable is Boolean, True means use trans deriv code. Otherwise nontrans
temp is Boolean True means use temp stretch otherwise use time set
by tier for streching
"""
signers = []
if not codes: # if not codes make list len count of same code
codes = [code for i in range(count)]
stem = self.stem if self.stem else "{:x}".format(pidx) # if not stem use pidx
for i, code in enumerate(codes):
path = "{}{:x}{:x}".format(stem, ridx, kidx + i)
signers.append(self.salter.signer(path=path,
code=code,
transferable=transferable,
tier=self.tier,
temp=temp))
return signers
[docs]
class Creatory:
"""
Factory class for creating Creator subclasses to create key pairs based on
the provided algorithm.
Usage: creator = Creatory(algo='salty').make(salt=b'0123456789abcdef')
Attributes:
Properties:
Methods:
.create is method to create key pair
Hidden:
._create is method reference set to one of algorithm methods
._novelCreate
._indexCreate
"""
[docs]
def __init__(self, algo=Algos.salty):
"""
Setup Creator.
Parameters:
algo is str code for algorithm
"""
if algo == Algos.randy:
self._make = self._makeRandy
elif algo == Algos.salty:
self._make = self._makeSalty
else:
raise ValueError("Unsupported creation algorithm ={}.".format(algo))
[docs]
def make(self, **kwa):
"""
Returns Creator subclass based on inited algo
"""
return (self._make(**kwa))
def _makeRandy(self, **kwa):
"""
"""
return RandyCreator(**kwa)
def _makeSalty(self, **kwa):
"""
"""
return SaltyCreator(**kwa)
# default values to init manager's globals database
Initage = namedtuple("Initage", 'aeid pidx salt tier')
[docs]
class Manager:
"""Manages key pairs creation, storage, and signing
Class for managing key pair creation, storage, retrieval, and message signing.
Attributes:
ks (Keeper): key store LMDB database instance for storing public and private keys
encrypter (Encrypter): instance for encrypting secrets. Public
encryption key is derived from aeid (public signing key)
decrypter (Decrypter): instance for decrypting secrets. Private
decryption key is derived seed (private signing key seed)
inited (bool): True means fully initialized wrt database.
False means not yet fully initialized
Attributes (Hidden):
_seed (str): qb64 private-signing key (seed) for the aeid from which
the private decryption key is derived. If aeid stored in
database is not empty then seed may required to do any key
management operations. The seed value is memory only and MUST NOT
be persisted to the database for the manager with which it is used.
It MUST only be loaded once when the process that runs the Manager
is initialized. Its presence acts as an authentication, authorization,
and decryption secret for the Manager and must be stored on
another device from the device that runs the Manager.
Properties:
aeid (str): authentication and encryption fully qualified qb64
non-transferable identifier prefix for authentication via signing
and asymmetric encryption of secrets using the associated
(public, private) key pair. Secrets include both salts and private
keys for all key sets in keeper. Defaults to empty which means no
authentication or encryption of key sets. Use initial attribute because
keeper may not be open on init.
pidx (int): pidx prefix index.
Use initial attribute because keeper
may not be open on init. Each sequence gets own pidx. Enables
unique recreatable salting of key sequence based on pidx.
salt (str): qb64 of root salt. Makes random root salt if not provided
initial salt. Use inital attribute because keeper may not be
open on init.
tier (str): initial security tier as value of Tierage. Use initial attribute
because keeper may not be open on init
Methods:
"""
[docs]
def __init__(self, *, ks=None, seed=None, **kwa):
"""
Setup Manager.
Parameters:
ks (Keeper): key store instance (LMDB)
seed (str): qb64 private-signing key (seed) for the aeid from which
the private decryption key may be derived. If aeid stored in
database is not empty then seed may required to do any key
management operations. The seed value is memory only and MUST NOT
be persisted to the database for the manager with which it is used.
It MUST only be loaded once when the process that runs the Manager
is initialized. Its presence acts as an authentication, authorization,
and decryption secret for the Manager and must be stored on
another device from the device that runs the Manager.
Currently only code MtrDex.Ed25519_Seed is supported.
Parameters: Passthrough to .setup for later initialization
aeid (str): qb64 of non-transferable identifier prefix for
authentication and encryption of secrets in keeper. If provided
aeid (not None) and different from aeid stored in database then
all secrets are re-encrypted using new aeid. In this case the
provided seed must not be empty. A change in aeid should require
a second authentication mechanism besides the seed.
pidx (int): index of next new created key pair sequence bound to a
given identifier prefix. Each sequence gets own pidx. Enables
unique recreatable salting of key sequence based on pidx.
salt (str): qb64 of root salt. Makes random root salt if not provided
tier (str): default security tier (Tierage) for root salt
"""
self.ks = ks if ks is not None else Keeper(reopen=True)
self.encrypter = None
self.decrypter = None
self._seed = seed if seed is not None else ""
self.inited = False
# save keyword arg parameters to init later if db not opened yet
self._inits = kwa
if self.ks.opened: # allows keeper db to opened asynchronously
self.setup(**self._inits) # first call to .setup with initialize database
[docs]
def setup(self, aeid=None, pidx=None, algo=None, salt=None, tier=None):
"""
Setups manager root or global attributes and properties
Assumes that .keeper db is open.
If .keeper.gbls sub database has not been initialized for the first time
then initializes from ._inits. This allows dependency injection of
keepr db into manager instance prior to keeper db being opened to
accomodate asynchronous process setup of db resources. Putting the db
initialization here enables asynchronous opening of keeper db after
keeper instance is instantiated. First call to .setup will initialize
keeper db defaults if never before initialized (vacuous initialization).
Parameters:
aeid (str): qb64 of non-transferable identifier prefix for
authentication and encryption of secrets in keeper. If provided
aeid (not None) and different from aeid stored in database then
all secrets are re-encrypted using new aeid. In this case the
provided seed must not be empty. A change in aeid should require
a second authentication mechanism besides the secret.
aeid same as current aeid no change innocuous
aeid different but empty which unencrypts and removes aeid
aeid different not empty which reencrypts and updates aeid
pidx (int): index of next new created key pair sequence for given
identifier prefix
algo (str): root algorithm (randy or salty) for creating key pairs
salt (str): qb64 of root salt. Makes random root salt if not provided
tier (str): default security tier (Tierage) for root salt
"""
if not self.ks.opened:
raise ClosedError("Attempt to setup Manager closed keystore"
" database .ks.")
if aeid is None:
aeid = ''
if pidx is None:
pidx = 0
if algo is None:
algo = Algos.salty
if salt is None:
salt = Salter().qb64
else:
if Salter(qb64=salt).qb64 != salt:
raise ValueError(f"Invalid qb64 for salt={salt}.")
if tier is None:
tier = Tiers.low
# update database if never before initialized
if self.pidx is None: # never before initialized
self.pidx = pidx # init to default
if self.algo is None: # never before initialized
self.algo = algo
if self.salt is None: # never before initialized
self.salt = salt
if self.tier is None: # never before initialized
self.tier = tier # init to default
# must do this after salt is initialized so gets re-encrypted correctly
if not self.aeid: # never before initialized
self.updateAeid(aeid, self.seed)
else:
self.encrypter = Encrypter(verkey=self.aeid) # derive encrypter from aeid
if not self.seed or not self.encrypter.verifySeed(self.seed):
raise AuthError("Last seed missing or provided last seed "
"not associated with last aeid={}."
"".format(self.aeid))
self.decrypter = Decrypter(seed=self.seed)
self.inited = True
[docs]
def updateAeid(self, aeid, seed):
"""
Given seed belongs to aeid and encrypter, update aeid and re-encrypt all
secrets
Parameters:
aeid (Optional(str)): qb64 of new auth encrypt id (public signing key)
aeid may match current aeid no change innocuous
aeid may be empty which unencrypts and removes aeid
aeid may be different not empty which reencrypts
seed (str): qb64 of new seed from which new aeid is derived (private signing
key seed)
"""
if self.aeid: # check that last current seed matches last current .aeid
# verifies seed belongs to aeid
if not self.seed or not self.encrypter.verifySeed(self.seed):
raise AuthError("Last seed missing or provided last seed "
"not associated with last aeid={}."
"".format(self.aeid))
if aeid: # aeid provided
if aeid != self.aeid: # changing to a new aeid so update .encrypter
self.encrypter = Encrypter(verkey=aeid) # derive encrypter from aeid
# verifies new seed belongs to new aeid
if not seed or not self.encrypter.verifySeed(seed):
raise AuthError("Seed missing or provided seed not associated"
" with provided aeid={}.".format(aeid))
else: # changing to empty aeid so new encrypter is None
self.encrypter = None
# fetch all secrets from db, decrypt all secrets with self.decrypter
# unless they decrypt automatically on fetch and then re-encrypt with
# encrypter update db with re-encrypted values
# re-encypt root salt secret, .salt property is automatically decrypted on fetch
if (salt := self.salt) is not None: # decrypted salt
self.salt = salt
# self.salt = self.encrypter.encrypt(ser=salt).qb64 if self.encrypter else salt
# other secrets
if self.decrypter:
# re-encrypt root salt secrets by prefix parameters .prms
for keys, data in self.ks.prms.getTopItemIter(): # keys is tuple of pre qb64
if data.salt:
salter = self.decrypter.decrypt(qb64=data.salt)
data.salt = (self.encrypter.encrypt(prim=salter).qb64
if self.encrypter else salter.qb64)
self.ks.prms.pin(keys, val=data)
# private signing key seeds
# keys is tuple == (verkey.qb64,) .pris database auto decrypts
for keys, signer in self.ks.pris.getTopItemIter(decrypter=self.decrypter):
self.ks.pris.pin(keys, signer, encrypter=self.encrypter)
self.ks.gbls.pin("aeid", aeid) # set aeid in db
self._seed = seed # set .seed in memory
# update .decrypter
self.decrypter = Decrypter(seed=seed) if seed else None
@property
def seed(self):
"""
seed property getter from ._seed.
seed (str): qb64 from which aeid is derived
"""
return self._seed
@property
def aeid(self):
"""
aeid property getter from key store db.
Assumes db initialized.
aeid is qb64 auth encrypt id prefix
"""
return self.ks.gbls.get('aeid')
@property
def pidx(self):
"""
pidx property getter from key store db.
Assumes db initialized.
pidx is prefix index int for next new key sequence
"""
if (pidx := self.ks.gbls.get("pidx")) is not None:
return int(pidx, 16)
return pidx # None
@pidx.setter
def pidx(self, pidx):
"""
pidx property setter to key store db.
pidx is prefix index int for next new key sequence
"""
self.ks.gbls.pin("pidx", "%x" % pidx)
@property
def algo(self):
"""
also property getter from key store db.
Assumes db initialized.
algo is default root algorithm for creating key pairs
"""
return self.ks.gbls.get('algo')
@algo.setter
def algo(self, algo):
"""
algo property setter to key store db.
algo is default root algorithm for creating key pairs
"""
self.ks.gbls.pin('algo', algo)
@property
def salt(self):
"""
salt property getter from key store db.
Assumes db initialized.
salt is default root salt for new key sequence creation
"""
salt = self.ks.gbls.get('salt')
if self.decrypter: # given .decrypt secret salt must be encrypted in db
return self.decrypter.decrypt(qb64=salt).qb64
return salt
@salt.setter
def salt(self, salt):
"""
salt property setter to key store db.
Parameters:
salt (str): qb64 default root salt for new key sequence creation
may be plain text or cipher text handled by updateAeid
"""
if self.encrypter:
salt = self.encrypter.encrypt(ser=salt, code=MtrDex.X25519_Cipher_Salt).qb64
self.ks.gbls.pin('salt', salt)
@property
def tier(self):
"""
tier property getter from key store db.
Assumes db initialized.
tier is default root security tier for new key sequence creation
"""
return self.ks.gbls.get('tier')
@tier.setter
def tier(self, tier):
"""
tier property setter to key store db.
tier is default root security tier for new key sequence creation
"""
self.ks.gbls.pin('tier', tier)
[docs]
def incept(self, icodes=None, icount=1, icode=MtrDex.Ed25519_Seed,
ncodes=None, ncount=1, ncode=MtrDex.Ed25519_Seed,
dcode=MtrDex.Blake3_256,
algo=None, salt=None, stem=None, tier=None, rooted=True,
transferable=True, temp=False):
"""
Returns tuple (verfers, digers) for inception event where
verfers is list of current public key verfers
public key is verfer.qb64
digers is list of next public key digers
digest to xor is diger.raw
Incept a prefix. Use first public key as temporary prefix.
Must .repre later to move pubsit dict to correct permanent prefix.
Store the dictified PreSit in the keeper under the first public key
Parameters:
icodes is list of private key derivation codes qb64 str
one per incepting key pair
icount is int count of incepting public keys when icodes not provided
icode is str derivation code qb64 of all icount incepting private keys
when icodes list not provided
ncodes is list of private key derivation codes qb64 str
one per next key pair
ncount is int count of next public keys when ncodes not provided
ncode is str derivation code qb64 of all ncount next public keys
when ncodes not provided
dcode is str derivation code qb64 of next digers. Default is MtrDex.Blake3_256
algo is str key creation algorithm code
salt is str qb64 salt for randomization when salty algorithm used
stem is path modifier used with salt to derive private keys when using
salty agorithms. if stem is None then uses pidx
tier is str security criticality tier code when using salty algorithm
rooted is Boolean true means derive incept salt from root salt when
incept salt not provided. Otherwise use incept salt only
transferable is Boolean, True means each public key uses transferable
derivation code. Default is transferable. Special case is non-transferable
Use case for incept to use transferable = False is for basic
derivation of non-transferable identifier prefix.
When the derivation process of the identifier prefix is
transferable then one should not use non-transferable for the
associated public key(s).
temp is Boolean. True is temporary for testing. It modifies tier of salty algorithm
When both ncodes is empty and ncount is 0 then the nxt is null and will
not be rotatable. This makes the identifier non-transferable in effect
even when the identifier prefix is transferable.
"""
# get root defaults to initialize key sequence
if rooted and algo is None: # use root algo from db as default
algo = self.algo
if rooted and salt is None: # use root salt from db instead of random salt
salt = self.salt
if rooted and tier is None: # use root tier from db as default
tier = self.tier
pidx = self.pidx # get next pidx
ridx = 0 # rotation index
kidx = 0 # key pair index
creator = Creatory(algo=algo).make(salt=salt, stem=stem, tier=tier)
if not icodes: # all same code, make list of len icount of same code
if icount <= 0:
raise ValueError("Invalid icount={} must be > 0.".format(icount))
icodes = [icode for i in range(icount)]
isigners = creator.create(codes=icodes,
pidx=pidx, ridx=ridx, kidx=kidx,
transferable=transferable, temp=temp)
verfers = [signer.verfer for signer in isigners]
if not ncodes: # all same code, make list of len ncount of same code
if ncount < 0: # next may be zero if non-trans
raise ValueError("Invalid ncount={} must be >= 0.".format(ncount))
ncodes = [ncode for i in range(ncount)]
# count set to 0 to ensure does not create signers if ncodes is empty
nsigners = creator.create(codes=ncodes, count=0,
pidx=pidx, ridx=ridx+1, kidx=kidx+len(icodes),
transferable=transferable, temp=temp)
digers = [Diger(ser=signer.verfer.qb64b, code=dcode) for signer in nsigners]
# Secret to encrypt here
pp = PrePrm(pidx=pidx,
algo=algo,
stem=creator.stem,
tier=creator.tier)
if creator.salt:
pp.salt = (creator.salt if not self.encrypter
else self.encrypter.encrypt(ser=creator.salt,
code=MtrDex.X25519_Cipher_Salt).qb64)
dt = nowIso8601()
ps = PreSit(
new=PubLot(pubs=[verfer.qb64 for verfer in verfers],
ridx=ridx, kidx=kidx, dt=dt),
nxt=PubLot(pubs=[signer.verfer.qb64 for signer in nsigners],
ridx=ridx+1, kidx=kidx+len(icodes), dt=dt))
pre = verfers[0].qb64b
if not self.ks.pres.put(pre, val=Prefixer(qb64=pre)):
raise ValueError("Already incepted pre={}.".format(pre.decode("utf-8")))
if not self.ks.prms.put(pre, val=pp):
raise ValueError("Already incepted prm for pre={}.".format(pre.decode("utf-8")))
self.pidx = pidx + 1 # increment for next inception
if not self.ks.sits.put(pre, val=ps):
raise ValueError("Already incepted sit for pre={}.".format(pre.decode("utf-8")))
for signer in isigners: # store secrets (private key val keyed by public key)
self.ks.pris.put(keys=signer.verfer.qb64b, val=signer,
encrypter=self.encrypter)
self.ks.pubs.put(riKey(pre, ri=ridx), val=PubSet(pubs=ps.new.pubs))
for signer in nsigners: # store secrets (private key val keyed by public key)
self.ks.pris.put(keys=signer.verfer.qb64b, val=signer,
encrypter=self.encrypter)
# store publics keys for lookup of private key for replay
self.ks.pubs.put(riKey(pre, ri=ridx+1), val=PubSet(pubs=ps.nxt.pubs))
return (verfers, digers)
[docs]
def move(self, old, new):
"""
Assigns new pre to old default .pres at old
Moves PrePrm and PreSit dicts in keeper db from old default pre to new pre db key
The new pre is the newly derived prefix which may only be known some
time after the original creation of the associated key pairs.
Paraameters:
old is str for old prefix of pubsit dict in keeper db
new is str for new prefix to move pubsit dict to in keeper db
"""
if old == new:
return
if self.ks.pres.get(old) is None:
raise ValueError("Nonexistent old pre={}, nothing to assign.".format(old))
if self.ks.pres.get(new) is not None:
raise ValueError("Preexistent new pre={} may not clobber.".format(new))
if (oldprm := self.ks.prms.get(old)) is None:
raise ValueError("Nonexistent old prm for pre={}, nothing to move.".format(old))
if self.ks.prms.get(new) is not None:
raise ValueError("Preexistent new prm for pre={} may not clobber.".format(new))
if (oldsit := self.ks.sits.get(old)) is None:
raise ValueError("Nonexistent old sit for pre={}, nothing to move.".format(old))
if self.ks.sits.get(new) is not None:
raise ValueError("Preexistent new sit for pre={} may not clobber.".format(new))
if not self.ks.prms.put(new, val=oldprm):
raise ValueError("Failed moving prm from old pre={} to new pre={}.".format(old, new))
else:
self.ks.prms.rem(old)
if not self.ks.sits.put(new, val=oldsit):
raise ValueError("Failed moving sit from old pre={} to new pre={}.".format(old, new))
else:
self.ks.sits.rem(old)
# move .pubs entries if any
i = 0
while (pl := self.ks.pubs.get(riKey(old, i))):
if not self.ks.pubs.put(riKey(new, i), val=pl):
raise ValueError("Failed moving pubs at pre={} ri={} to new"
" pre={}".format(old, i, new))
i += 1
# assign old
if not self.ks.pres.pin(old, val=Prefixer(qb64=new)):
raise ValueError("Failed assiging new pre={} to old pre={}.".format(new, old))
# make new so that if move again we reserve each one
if not self.ks.pres.put(new, val=Prefixer(qb64=new)):
raise ValueError("Failed assiging new pre={}.".format(new))
[docs]
def rotate(self, pre, ncodes=None, ncount=1,
ncode=MtrDex.Ed25519_Seed,
dcode=MtrDex.Blake3_256,
transferable=True, temp=False, erase=True):
"""
Returns tuple (verfers, digers) for rotation event of keys for pre where
verfers is list of current public key verfers
public key is verfer.qb64
digers is list of next public key digers
digest to xor is diger.raw
Rotate a prefix.
Store the updated dictified PreSit in the keeper under pre
Parameters:
pre (str) qb64 of prefix
ncodes (list): of private key derivation codes qb64 str
one per next key pair
ncount (int): count of next public keys when icodes not provided
ncode (str): derivation code qb64 of all ncount next public keys
when ncodes not provided
dcode i(str): derivation code qb64 of next key digest of digers
Default is MtrDex.Blake3_256
transferable (bool): True means each public key uses transferable
derivation code. Default is transferable. Special case is non-transferable
Normally no use case for rotation to use transferable = False.
When the derivation process of the identifier prefix is
transferable then one should not use transferable = False for the
associated public key(s).
temp (bool): True is temporary for testing. It modifies tier of salty algorithm
erase (bool): True means erase old private keys made stale by rotation
When both ncodes is empty and ncount is 0 then the nxt is null and will
not be rotatable. This makes the identifier non-transferable in effect
even when the identifier prefix is transferable.
"""
# Secret to decrypt here
if (pp := self.ks.prms.get(pre)) is None:
raise ValueError("Attempt to rotate nonexistent pre={}.".format(pre))
if (ps := self.ks.sits.get(pre)) is None:
raise ValueError("Attempt to rotate nonexistent pre={}.".format(pre))
if not ps.nxt.pubs: # empty nxt public keys so non-transferable prefix
raise ValueError("Attempt to rotate nontransferable pre={}.".format(pre))
old = ps.old # save prior old so can clean out if rotate successful
ps.old = ps.new # move prior new to old so save previous one step
ps.new = ps.nxt # move prior nxt to new which new is now current signer
verfers = [] # assign verfers from current new was prior nxt
for pub in ps.new.pubs:
if self.aeid and not self.decrypter: # maybe should rethink this
raise DecryptError("Unauthorized decryption attempt. "
"Aeid but no decrypter.")
if ((signer := self.ks.pris.get(pub.encode("utf-8"),
decrypter=self.decrypter)) is None):
raise ValueError("Missing prikey in db for pubkey={}".format(pub))
verfers.append(signer.verfer)
salt = pp.salt
if salt:
if self.aeid:
if not self.decrypter:
raise DecryptError("Unauthorized decryption. Aeid but no decrypter.")
salt = self.decrypter.decrypt(qb64=salt).qb64
else:
salt = Salter(qb64=salt).qb64 # ensures salt was unencrypted
creator = Creatory(algo=pp.algo).make(salt=salt, stem=pp.stem, tier=pp.tier)
if not ncodes: # all same code, make list of len count of same code
if ncount < 0: # next may be zero if non-trans
raise ValueError("Invalid count={} must be >= 0.".format(ncount))
ncodes = [ncode for i in range(ncount)]
pidx = pp.pidx # get pidx for this key sequence, may be used by salty creator
ridx = ps.new.ridx + 1
kidx = ps.nxt.kidx + len(ps.new.pubs)
# count set to 0 to ensure does not create signers if codes is empty
signers = creator.create(codes=ncodes, count=0,
pidx=pidx, ridx=ridx, kidx=kidx,
transferable=transferable, temp=temp)
digers = [Diger(ser=signer.verfer.qb64b, code=dcode) for signer in signers]
dt = nowIso8601()
ps.nxt = PubLot(pubs=[signer.verfer.qb64 for signer in signers],
ridx=ridx, kidx=kidx, dt=dt)
if not self.ks.sits.pin(pre, val=ps):
raise ValueError("Problem updating pubsit db for pre={}.".format(pre))
for signer in signers: # store secrets (private key val keyed by public key)
self.ks.pris.put(keys=signer.verfer.qb64b, val=signer,
encrypter=self.encrypter)
# store public keys for lookup of private keys by public key for replay
self.ks.pubs.put(riKey(pre, ri=ps.nxt.ridx), val=PubSet(pubs=ps.nxt.pubs))
if erase:
for pub in old.pubs: # remove prior old prikeys not current old
self.ks.pris.rem(pub)
return (verfers, digers)
[docs]
def sign(self, ser, pubs=None, verfers=None, indexed=True,
indices=None, ondices=None, pre=None, path=None):
"""
Returns list of signatures of ser if indexed as Sigers else as Cigars with
.verfer assigned.
Parameters:
ser (bytes): serialization to sign
pubs (list[str] | None): of qb64 public keys to lookup private keys
one of pubs or verfers is required. If both then verfers is ignored.
verfers (list[Verfer] | None): Verfer instances of public keys
one of pubs or verfers is required. If both then verfers is ignored.
If not pubs then gets public key from verfer.qb64
indexed (bool):
True means use use indexed signatures and return
list of Siger instances.
False means do not use indexed signatures and return
list of Cigar instances
When indexed True, each index is an offset that maps the offset
in the coherent lists: pubs, verfers, signers (pris from keystore .ks)
onto the appropriate offset into the signing keys or prior next
keys lists of a key event as determined by the indices and ondices
lists, or appropriate defaults when indices and/or ondices are not
provided.
indices (list[int] | None): indices (offsets) when indexed == True,
to use for indexed signatures whose offset into the current keys
or prior next list may differ from the order of appearance
in the provided coherent pubs, verfers, signers lists.
This allows witness indexed sigs or controller multi-sig
where the parties do not share the same manager or ordering so
the default ordering in pubs or verfers is wrong for the index.
This sets the value of the index property of the returned Siger.
When provided the length of indices must match the len of the
coherent lists: pubs, verfers, signers (pris from keystore .ks)
else raises ValueError.
When not provided and indexed is True then use default index that
is the offset into the coherent lists:
pubs, verfers, signers (pris from keystore .ks)
ondices (list[int | None] | None): other indices (offsets)
when indexed is True for indexed signatures whose offset into
the prior next list may differ from the order of appearance
in the provided coherent pubs, verfers, signers lists.
This allows partial rotation with reserve or custodial key
management so that the index (hash of index) of the public key
for the signature appears at a different index in the
current key list from the prior next list.
This sets the value of the ondex property of the returned Siger.
When provided the length of indices must match the len of the
coherent lists: pubs, verfers, signers (pris from keystore .ks)
else raises ValueError.
When no ondex is applicable to a given signature then the value
of the entry in ondices MUST be None.
When ondices is not provided then all sigers .ondex is None.
pre (str | None): identity prefix (aid) of signer. Used for HDK salty
algo key lookup or re-creation. Look up key path state and algo
from local keystore .ks
path (tuple | None): HDX randy algo signing key path tuple part of form
(ridx, kidx) where ridx is the optional rotation index and kidx
is the required zeroth key index of the key list. The fully HDK
path is formed by looking up the stem and pidx using the pre
and when provided the ridx and then computing the pidxes for each
signing key. When indices is provided then the kidxes are computed
by adding the index offset to the zeroth kidx. When indices is not
provided then the kidxes are assumed to be all the keys in the key
list computed from the local keystore by subtracting the kidx
of the zeroth element of the following key list. When path is not
provided then the default is all the kidxs of the key list from
the current .new key info.
if neither pubs or verfers provided then returns empty list of signatures
If pubs then ignores verfers otherwise uses verferss
Manager implement .sign method and tests
sign(self,ser,pubs,indexed=True)
checks for pris for pubs in db is not raises error
then signs ser with eah pub
returns list of sigers indexed else list of cigars if not
"""
signers = []
if pubs is None and verfers is None:
if pre is None:
raise ValueError("pubs or verfers or pre required")
# logic here to generate paths
# use pre to read .ks.prms and .ks.sits to get algo stem and pidx and
# sits .old an .new for pre
if path is None: # use provided path tuple for .new or .nxt
pass
# defualt path is .new.ridx and .new.kidx
# compute paths
# if indices provided use indices to compute kidxes
# otherwise default is all the keys from the .new key list so use
# .nxt to comput number of keys to generate kidxes for paths
paths = []
# use paths to generate signers
if pubs:
for pub in pubs:
if self.aeid and not self.decrypter:
raise DecryptError("Unauthorized decryption attempt. "
"Aeid but no decrypter.")
if ((signer := self.ks.pris.get(pub, decrypter=self.decrypter))
is None):
raise ValueError("Missing prikey in db for pubkey={}".format(pub))
signers.append(signer)
else:
for verfer in verfers:
if self.aeid and not self.decrypter:
raise DecryptError("Unauthorized decryption attempt. "
"Aeid but no decrypter.")
if ((signer := self.ks.pris.get(verfer.qb64,
decrypter=self.decrypter))
is None):
raise ValueError("Missing prikey in db for pubkey={}".format(verfer.qb64))
signers.append(signer)
if indices and len(indices) != len(signers):
raise ValueError(f"Mismatch indices length={len(indices)} and resultant"
f" signers length={len(signers)}")
if ondices and len(ondices) != len(signers):
raise ValueError(f"Mismatch ondices length={len(ondices)} and resultant"
f" signers length={len(signers)}")
if indexed:
sigers = []
for j, signer in enumerate(signers):
if indices: # not the default get index from indices
i = indices[j] # must be whole number
if not isinstance(i, int) or i < 0:
raise ValueError(f"Invalid signing index = {i}, not "
f"whole number.")
else: # the default
i = j # same index as database
if ondices: # not the default get ondex from ondices
o = ondices[j] # int means both, None means current only
if not (o is None or
isinstance(o, int) and not isinstance(o, bool) and o >= 0):
raise ValueError(f"Invalid other signing index = {o}, not "
f"None or not whole number.")
else: # default
o = i # must both be same value int
# .sign assigns .verfer of siger and sets code of siger
# appropriately for single or dual indexed signatures
sigers.append(signer.sign(ser,
index=i,
only=True if o is None else False,
ondex=o))
return sigers
else:
cigars = []
for signer in signers:
cigars.append(signer.sign(ser)) # assigns .verfer to cigar
return cigars
[docs]
def decrypt(self, qb64, pubs=None, verfers=None):
"""
Returns decrypted plaintext of encrypted qb64 ciphertext serialization.
Parameters:
qb64 (str | bytes | bytearray | memoryview): fully qualified base64
ciphertext serialization to decrypt
pubs (list[str] | None): of qb64 public keys to lookup private keys
one of pubs or verfers is required. If both then verfers is ignored.
verfers (list[Verfer] | None): Verfer instances of public keys
one of pubs or verfers is required. If both then verfers is ignored.
If not pubs then gets public key from verfer.qb64 used to lookup
private keys
Returns:
plain (bytes): decrypted plaintext
"""
signers = []
if pubs:
for pub in pubs:
if self.aeid and not self.decrypter:
raise DecryptError("Unauthorized decryption attempt. "
"Aeid but no decrypter.")
if ((signer := self.ks.pris.get(pub, decrypter=self.decrypter))
is None):
raise ValueError("Missing prikey in db for pubkey={}".format(pub))
signers.append(signer)
else:
for verfer in verfers:
if self.aeid and not self.decrypter:
raise DecryptError("Unauthorized decryption attempt. "
"Aeid but no decrypter.")
if ((signer := self.ks.pris.get(verfer.qb64,
decrypter=self.decrypter))
is None):
raise ValueError("Missing prikey in db for pubkey={}".format(verfer.qb64))
signers.append(signer)
if hasattr(qb64, "encode"):
qb64 = qb64.encode() # convert str to bytes
qb64 = bytes(qb64) # convert bytearray or memoryview to bytes
for signer in signers:
sigkey = signer.raw + signer.verfer.raw # sigkey is raw seed + raw verkey
prikey = pysodium.crypto_sign_sk_to_box_sk(sigkey) # raw private encrypt key
pubkey = pysodium.crypto_scalarmult_curve25519_base(prikey)
plain = pysodium.crypto_box_seal_open(qb64, pubkey, prikey) # qb64b
if plain == qb64:
raise ValueError(f"Unable to decrypt.")
return plain
[docs]
def ingest(self, secrecies, iridx=0, ncount=1, ncode=MtrDex.Ed25519_Seed,
dcode=MtrDex.Blake3_256,
algo=Algos.salty, salt=None, stem=None, tier=None,
rooted=True, transferable=True, temp=False):
"""
Ingest secrecies as a list of lists of secrets organized in event order
to register the sets of secrets of associated externally generated keypair
lists into the database.
Returns:
ret (tuple): (ipre, veferies) where:
ipre is prefix index of ingested key pairs needed to fetch later
for replay
veferies is list of lists of all the verfers for the public keys
from the private keys in secrecies in order of appearance.
Essentially ingest ends with the current keys as the
last key list in secrecies and the nxt keys are newly created as if a
rotation to the last set of keys was performed. Unlike rotate, however,
ingest does not delete any of the private keys it ingests. This must be
done separately if desired.
Each list in secrecies is an ordered list of private keys corresponding
to the public list in the key state for each establishment event in order.
The first list are the keys for the inception event, the next list for
the first rotation, and each subsequent list for the next rotation and
so on.
May be used for import or recovery from backup.
Method parameters specify the policy for generating new keys pairs for
rotations that follow the ingested list of lists. The parameters are used
to define how to rotate to new key pairs that follow the ingested sequence.
Parameters:
secrecies (list): list of lists of fully qualified secrets (private keys)
iridx (int): initial ridx at where set PubSit after ingestion
enables database to store where initial replay should start from
ncount (int): count of next public keys for next after end of secrecies
ncode (str): derivation code qb64 of all ncount next public keys
after end of secrecies
dcode is str derivation code qb64 of next digers after end of secrecies
Default is MtrDex.Blake3_256
algo is str key creation algorithm code for next after end of secrecies
salt is str qb64 salt for randomization when salty algorithm used
for next after end of secrecies
stem is path modifier used with salt to derive private keys when using
salty agorithms. if stem is None then uses pidx
tier is str security criticality tier code when using salty algorithm
rooted is Boolean true means derive incept salt from root salt when
incept salt not provided. Otherwise use incept salt only
transferable is Boolean, True means each public key uses transferable
derivation code. Default is transferable. Special case is non-transferable
Use case for incept to use transferable = False is for basic
derivation of non-transferable identifier prefix.
When the derivation process of the identifier prefix is
transferable then one should not use non-transferable for the
associated public key(s).
temp is Boolean. True is temporary for testing. It modifies strech
time of salty algorithm
"""
if iridx > len(secrecies):
raise ValueError(f"Initial ridx={iridx} beyond last secrecy.")
# configure parameters for creating new keys after ingested sequence
if rooted and salt is None: # use root salt instead of random salt
salt = self.salt
if rooted and tier is None: # use root tier as default
tier = self.tier
pidx = self.pidx # get next pidx
creator = Creatory(algo=algo).make(salt=salt, stem=stem, tier=tier)
ipre = ""
dt = "" # empty for incept of old
pubs = []
ridx = 0
kidx = 0
verferies = [] # list of lists of verfers
first = True
secrecies = deque(secrecies)
while secrecies:
csecrets = secrecies.popleft() # current
csigners = [Signer(qb64=secret, transferable=transferable)
for secret in csecrets]
csize = len(csigners)
verferies.append([signer.verfer for signer in csigners])
if first:
# Secret to encrypt here
pp = PrePrm(pidx=pidx,
algo=algo,
salt=(creator.salt if not self.encrypter
else self.encrypter.encrypt(ser=creator.salt,
code=MtrDex.X25519_Cipher_Salt).qb64),
stem=creator.stem,
tier=creator.tier)
pre = csigners[0].verfer.qb64b
ipre = csigners[0].verfer.qb64
if not self.ks.pres.put(pre, val=Prefixer(qb64=pre)):
raise ValueError("Already incepted pre={}.".format(pre.decode("utf-8")))
if not self.ks.prms.put(pre, val=pp):
raise ValueError("Already incepted prm for pre={}.".format(pre.decode("utf-8")))
self.pidx = pidx + 1 # increment so unique
first = False
for signer in csigners: # store secrets (private key val keyed by public key)
self.ks.pris.put(keys=signer.verfer.qb64b, val=signer,
encrypter=self.encrypter)
pubs = [signer.verfer.qb64 for signer in csigners]
self.ks.pubs.put(riKey(pre, ri=ridx), val=PubSet(pubs=pubs))
dt = nowIso8601()
if ridx == max(iridx - 1, 0): # setup ps.old at this ridx
if iridx == 0:
old = PubLot() # defaults ok
else:
osigners = csigners
osith = "{:x}".format(max(1, math.ceil(len(osigners) / 2)))
ost = Tholder(sith=osith).sith
old=PubLot(pubs=pubs, ridx=ridx, kidx=kidx, dt=dt)
ps = PreSit(old=old) # .new and .nxt are default
if not self.ks.sits.pin(pre, val=ps):
raise ValueError("Problem updating pubsit db for pre={}.".format(pre))
if ridx == iridx: # setup ps.new at this ridx
if (ps := self.ks.sits.get(pre)) is None:
raise ValueError("Attempt to rotate nonexistent pre={}.".format(pre))
new=PubLot(pubs=pubs, ridx=ridx, kidx=kidx, dt=dt)
ps.new = new
if not self.ks.sits.pin(pre, val=ps):
raise ValueError("Problem updating pubsit db for pre={}.".format(pre))
if ridx == iridx + 1: # set up ps.nxt at this ridx
if (ps := self.ks.sits.get(pre)) is None:
raise ValueError("Attempt to rotate nonexistent pre={}.".format(pre))
nsigners = csigners
nxt=PubLot(pubs=pubs, ridx=ridx, kidx=kidx, dt=dt)
ps.nxt = nxt
if not self.ks.sits.pin(pre, val=ps):
raise ValueError("Problem updating pubsit db for pre={}.".format(pre))
ridx += 1 # next ridx
kidx += csize # next kidx
# create nxt signers after ingested signers
nsigners = creator.create(count=ncount, code=ncode,
pidx=pidx, ridx=ridx, kidx=kidx,
transferable=transferable, temp=temp)
for signer in nsigners: # store secrets (private key val keyed by public key)
self.ks.pris.put(keys=signer.verfer.qb64b, val=signer)
pubs = [signer.verfer.qb64 for signer in nsigners]
self.ks.pubs.put(riKey(pre, ri=ridx), val=PubSet(pubs=pubs))
if ridx == iridx + 1: # want to set up ps.next at this ridx
dt = nowIso8601()
if (ps := self.ks.sits.get(pre)) is None:
raise ValueError("Attempt to rotate nonexistent pre={}.".format(pre))
nxt=PubLot(pubs=pubs, ridx=ridx, kidx=kidx, dt=dt)
ps.nxt = nxt
if not self.ks.sits.pin(pre, val=ps):
raise ValueError("Problem updating pubsit db for pre={}.".format(pre))
return (ipre, verferies) #
[docs]
def replay(self, pre, dcode=MtrDex.Blake3_256, advance=True, erase=True):
"""
Returns duple (verfers, digers) associated with public key set from
the key sequence for identifier prefix pre at rotation index ridx stored
in db .pubs. Inception is at ridx == 0.
Enables replay of preexisting public key sequence.
In returned duple:
verfers is list of current public key verfers. Public key is verfer.qb64.
digers is list of next public key digers. Digest to xor is diger.raw.
If key sequence at ridx does already exist in .pubs database for pre then
raises ValueError.
If preexisting pubs for pre exist but .ridx is two large for preexisting
pubs then raises IndexError.
Parameters:
pre (str): fully qualified qb64 identifier prefix
dcode (str): derivation code for digers for next xor digest.
Default is MtrDex.Blake3_256
advance (bool): True means advance to next set of keys for replay
erase (bool): True means erase old private keys made stale by
advancement when advance is True otherwise ignore
"""
if (pp := self.ks.prms.get(pre)) is None:
raise ValueError("Attempt to replay nonexistent pre={}.".format(pre))
if (ps := self.ks.sits.get(pre)) is None:
raise ValueError("Attempt to replay nonexistent pre={}.".format(pre))
if advance:
old = ps.old # save prior old so can clean out if rotate successful
ps.old = ps.new # move prior new to old so save previous one step
ps.new = ps.nxt # move prior nxt to new which new is now current signer
ridx = ps.new.ridx
kidx = ps.new.kidx
csize = len(ps.new.pubs)
# Usually when next keys are null then aid is effectively non-transferable
# but when replaying injected keys reaching null next pub keys or
# equivalently default empty is the sign that we have reached the
# end of the replay so need to raise an IndexError
if not (pubset := self.ks.pubs.get(riKey(pre, ridx+1))):
# empty nxt public keys so end of replay
raise IndexError(f"Invalid replay attempt of pre={pre} at "
f"ridx={ridx}.")
pubs = pubset.pubs # create nxt from pubs
dt = nowIso8601()
nxt=PubLot(pubs=pubs, ridx=ridx+1, kidx=kidx+csize, dt=dt)
ps.nxt = nxt
verfers = [] # assign verfers from current new was prior nxt
for pub in ps.new.pubs:
if self.aeid and not self.decrypter: # maybe should rethink this
raise DecryptError("Unauthorized decryption attempt. "
"Aeid but no decrypter.")
if ((signer := self.ks.pris.get(pub.encode("utf-8"),
decrypter=self.decrypter)) is None):
raise ValueError("Missing prikey in db for pubkey={}".format(pub))
verfers.append(signer.verfer)
digers = [Diger(ser=pub.encode("utf-8"), code=dcode)
for pub in ps.nxt.pubs]
if advance:
if not self.ks.sits.pin(pre, val=ps):
raise ValueError("Problem updating pubsit db for pre={}.".format(pre))
if erase:
for pub in old.pubs: # remove prior old prikeys not current old
self.ks.pris.rem(pub)
return (verfers, digers)
[docs]
class ManagerDoer(doing.Doer):
"""
Basic Manager Doer to initialize keystore database .ks
Inherited Attributes:
.done is Boolean completion state:
True means completed
Otherwise incomplete. Incompletion maybe due to close or abort.
Attributes:
.manager is Manager subclass
Inherited Properties:
.tyme is float relative cycle time of associated Tymist .tyme obtained
via injected .tymth function wrapper closure.
.tymth is function wrapper closure returned by Tymist .tymeth() method.
When .tymth is called it returns associated Tymist .tyme.
.tymth provides injected dependency on Tymist tyme base.
.tock is float, desired time in seconds between runs or until next run,
non negative, zero means run asap
Properties:
Methods:
.wind injects ._tymth dependency from associated Tymist to get its .tyme
.__call__ makes instance callable
Appears as generator function that returns generator
.do is generator method that returns generator
.enter is enter context action method
.recur is recur context action method or generator method
.exit is exit context method
.close is close context method
.abort is abort context method
Hidden:
._tymth is injected function wrapper closure returned by .tymen() of
associated Tymist instance that returns Tymist .tyme. when called.
._tock is hidden attribute for .tock property
"""
[docs]
def __init__(self, manager, **kwa):
"""
Parameters:
manager (Manager): instance
"""
super(ManagerDoer, self).__init__(**kwa)
self.manager = manager
def enter(self, *, temp=None):
""""""
if not self.manager.inited:
self.manager.setup(**self.manager._inits)
def exit(self):
""""""
pass