# -*- encoding: utf-8 -*-
"""
KERI
keri.db.basing module
"""
import importlib
import os
import shutil
from collections import namedtuple
from contextlib import contextmanager
import lmdb
import semver
from ordered_set import OrderedSet as oset
from hio.base import doing
from hio.help import ogler
from keri import __version__
from .dbing import LMDBer, dgKey, openLMDB
from ..kering import (MissingEntryError, DatabaseError,
ConfigurationError, ValidationError,
Vrsn_1_0, Vrsn_2_0)
from ..recording import (KeyStateRecord, EventSourceRecord,
HabitatRecord, TopicsRecord,
OobiRecord, EndpointRecord,
LocationRecord, ObservedRecord,
CacheTypeRecord, TxnMsgCacheRecord,
MsgCacheRecord, WellKnownAuthN)
logger = ogler.getLogger()
def _strip_prerelease(version_str):
"""Strip prerelease and build metadata from a semver string.
Semver compares alphanumeric prerelease identifiers lexicographically,
so 'dev4' > 'dev10' (because '4' > '1'). Stripping prerelease ensures
dev releases within the same version cycle compare as equal.
See: https://github.com/WebOfTrust/keripy/issues/820
"""
ver = semver.VersionInfo.parse(version_str)
return str(semver.Version(ver.major, ver.minor, ver.patch))
MIGRATIONS = [
("0.6.8", ["hab_data_rename"]),
("1.0.0", ["add_key_and_reg_state_schemas"]),
("1.2.0", ["rekey_habs"])
]
# ToDo XXXX maybe
'''
class komerdict(dict):
"""
Subclass of dict that has db as attribute and employs read through cache
from db Baser.stts of kever states to reload kever from state in database
when not found in memory as dict item.
add method that answers is a given pre a group hab pre .localGroup(pre)
Todo add wrapper decorator to update attributes
on class that injects instance attributes when class is instanced
one of the injected parameters is function that that maps returned Komer to
object class
parameters are subdb (must be Komer) and function that maps retrieved dataclass
record from dataabase to class instance. if no mapping function then just
return the dataclass record as value.
"""
'''
[docs]
class statedict(dict):
"""
Subclass of dict that has db as attribute and employs read through cache
from db Baser.stts of kever states to reload kever from state in database
when not found in memory as dict item.
"""
__slots__ = ('db') # no .__dict__ just for db reference
def __init__(self, *pa, **kwa):
super(statedict, self).__init__(*pa, **kwa)
self.db = None
def __getitem__(self, k):
try:
return super(statedict, self).__getitem__(k)
except KeyError as ex:
if not self.db:
raise ex # reraise KeyError
if (ksr := self.db.states.get(keys=k)) is None:
raise ex # reraise KeyError
try:
from ..core.eventing import Kever
kever = Kever(state=ksr, db=self.db)
except MissingEntryError: # no kel event for keystate
raise ex # reraise KeyError
self.__setitem__(k, kever)
return kever
def __contains__(self, k):
if not super(statedict, self).__contains__(k):
try:
self.__getitem__(k)
return True
except KeyError:
return False
else:
return True
[docs]
def get(self, k, default=None):
"""Override of dict get method
Parameters:
k (str): key for dict
default: default value to return if not found
Returns:
kever: converted from underlying dict or database
"""
if not super(statedict, self).__contains__(k):
return default
else:
return self.__getitem__(k)
[docs]
def openDB(*, cls=None, name="test", **kwa):
"""
Returns contextmanager generated by openLMDB but with Baser instance as default
"""
if cls == None: # can't reference class before its defined below
cls = Baser
return openLMDB(cls=cls, name=name, **kwa)
[docs]
@contextmanager
def reopenDB(db, clear=False, **kwa):
"""
Context manager wrapper LMDB DB instances.
Repens and closes db.path and db.env LMDB
Parameters:
db (LMDBer): instance with LMDB environment at .env
clear (bool): True means clear directory after close
Usage:
with reopenDB(baser) as env:
env. ....
"""
try:
db.reopen(clear=clear, **kwa)
yield db.env
finally:
db.close(clear=clear)
KERIBaserMapSizeKey = "KERI_BASER_MAP_SIZE"
[docs]
class Baser(LMDBer):
"""
Baser sets up named sub databases with Keri Event Logs within main database
Attributes:
see superclass LMDBer for inherited attributes
kevers (dbdict): read-through cache of Kever instances indexed by
identifier prefix qb64
prefixes (OrderedSet): local prefixes corresponding to habitats for
this db
groups (OrderedSet): group hab identifier prefixes for this db
.evts is named subDB instance of SerderSuber whose values are serialized
key events
subkey 'evts.'
dgKey (prefix + digest)
DB is keyed by identifier prefix plus digest of serialized event
Only one value per DB key is allowed
.fels is named subDB instance of OnSuber for first seen event logs (FEL)
as indices mapping first-seen ordinal fn to event digests.
Actual serialized key events are stored in .evts by SAID digest
Indexed in first-seen accepted order for replay and cloning.
subkey 'fels.'
Key: identifier prefix + monotonically increasing fn.
Value: qb64 str of event digest used to lookup event in .evts.
Only one value per DB key is allowed.
Append-only ordering of accepted first-seen events.
.kels is named subDB instance of OnIoDupSuber for key event logs as indices
mapping composite key "<pre><sep><on>" to serialized key event digests.
Actual serialized key events are stored in .evts by SAID digest.
subkey 'kels.'
Key: identifier prefix + sequence number.
Value: qb64 digest used to lookup event in .evts.
More than one value per DB key is allowed.
.dtss is named subDB instance of CesrSuber (klas=Dater) for datetime
stamps of when the event was first escrowed and then later first
seen by log. Used for escrow timeouts and extended validation.
subkey 'dtss.'
dgKey (prefix + digest)
Value: Dater instance
Only one value per DB key is allowed.
.aess is named subDB instance of CatCesrSuber (klas=(Number, Diger))
for authorizing event source seal couples that map digest of key
event to seal source couple of authorizer's (delegator or issuer)
event.
subkey 'aess.'
dgKey (prefix + digest)
Value: (Number, Diger) tuple; Number serialized as Huge
(fixed 24-char), used to lookup authorizer's source event in .kels.
Only one value per DB key is allowed.
.sigs is named subDB instance of CesrIoSetSuber (klas=Siger) for
fully qualified indexed event signatures from the controller.
subkey 'sigs.'
dgKey (prefix + digest)
More than one value per DB key is allowed.
.wigs is named subDB instance of CesrIoSetSuber (klas=Siger) for
indexed witness signatures of events that may come directly or be
derived from a witness receipt message. Witnesses always have
nontransferable identifier prefixes. The index is the offset of
the witness into the witness list of the most recent establishment
event wrt the receipted event.
subkey 'wigs.'
dgKey (prefix + digest)
More than one value per DB key is allowed.
.rcts is named subDB instance of CatCesrIoSetSuber (klas=(Prefixer, Cigar))
for event receipt couplets from nontransferable signers.
These are endorsements from nontransferable signers who are not witnesses
May be watchers or other
Each entry is a (Prefixer, Cigar) duple.
subkey 'rcts.'
dgKey (prefix + digest)
Multiple values per key stored as ordered set (duplicates ignored,
insertion order preserved).
.ures is named subDB instance of CatCesrIoSetSuber
(klas=(Diger, Prefixer, Cigar)) for unverified event receipt
escrowed triples from nontransferable signers. Each triple is
(receipted event digest, receiptor prefix, receipt signature).
Used to escrow receipt couples until the receipted event appears.
subkey 'ures.'
snKey (prefix + sequence number)
More than one value per DB key is allowed.
.vrcs is named subDB instance of CatCesrIoSetSuber
(klas=(Prefixer, Number, Diger, Siger)) for verified transferable-
validator receipt quadruples. Each stored value is a typed CESR
tuple (Prefixer, Number, Diger, Siger) representing a validator's
AID, its latest establishment-event sequence number, digest, and
its indexed signature over the event. Values preserved in insertion
order. Represents fully validated receipts moved out of escrow.
subkey 'vrcs.'
dgKey (prefix + digest)
Multiple values per key stored as ordered set.
.vres is named subDB instance of CatCesrIoSetSuber for escrowed
transferable-receipt quintuples. Each value is a typed CESR tuple
(Diger, Prefixer, Number, Diger, Siger) representing a validator's
receipt escrow entry. Holds unverified transferable receipts until
validated and moved into .vrcs.
subkey 'vres.'
snKey (prefix + sequence number)
Values stored in insertion order.
.pses is named subDB instance of OnIoDupSuber for partially-signed
event escrows under composite keys "<pre><sep><on>". Tracks events
with at least one verified signature but not yet fully validated
due to missing signatures or dependent events.
subkey 'pses.'
Key: identifier prefix + sequence number.
Values stored in insertion order.
.pwes is named subDB instance of OnIoDupSuber for partially witnessed
key event escrows under composite keys "<pre><sep><on>" to
serialized event digest. Escrows events with verified signatures
but not yet verified witness receipts.
subkey 'pwes.'
Key: identifier prefix + sequence number.
More than one value per DB key is allowed.
.pdes is named subDB instance of OnIoDupSuber for partially delegated
key event escrows that map prefix + sequence number to serialized
event digest. Used in conjunction with .udes which escrows the
associated seal source couple.
subkey 'pdes.'
snKey (prefix + sequence number)
More than one value per DB key is allowed.
.udes is named subDB instance of CatCesrSuber (klas=(Number, Diger))
for unverified delegation seal source couple escrows that map
(prefix, digest) of delegated event to delegating seal source
couple (sn, dig) that provides the source delegator event seal.
Each couple is (Number(num=sn).qb64b, Diger.qb64b) used to lookup
the source event in the delegator's KEL. Once accepted, entries
move into .aess.
subkey 'udes.'
dgKey (prefix + digest)
Only one value per DB key is allowed.
.uwes is named subDB instance of B64OnIoSetSuber for unverified event
indexed escrowed couples from witness signers. Each couple is
(edig, wig) where edig is receipted event digest and wig is the
indexed witness signature derived from the witness nontrans prefix
and offset into the witness list of the latest establishment event.
subkey 'uwes.'
Key: receipted event controller prefix + sequence number.
Multiple values per key are stored in insertion order as a set.
.ooes is named subDB instance of OnIoDupSuber for out-of-order event
escrows under composite keys "<pre><sep><on>". Tracks events whose
prior event has not yet been accepted into the KEL.
subkey 'ooes.'
Key: identifier prefix + sequence number.
Values stored in insertion order.
.dels is named subDB instance of OnIoDupSuber for duplicitous event
log tables that map identifier prefix plus sequence number to
serialized event digests.
subkey 'dels.'
snKey (prefix + sequence number)
Values are qb64 digests used to lookup event in .evts.
More than one value per DB key is allowed (insertion ordered).
.ldes is named subDB instance of OnIoDupSuber for likely duplicitous
escrowed event tables that map identifier prefix plus sequence
number to serialized event digests.
subkey 'ldes.'
snKey (prefix + sequence number)
Values are qb64 digests used to lookup event in .evts.
More than one value per DB key is allowed (insertion ordered).
.qnfs is named subDB instance of IoSetSuber for queued not-first-seen
event escrows. Maps (prefix, said) to event digest.
subkey 'qnfs.'
dupsort=True
More than one value per DB key is allowed.
.fons is named subDB instance of CesrSuber (klas=Number) mapping
prefix and digest to fn value (first seen ordinal number) of the
associated event. Given pre and event digest, retrieve fn here then
fetch event from .fels. Ensures any event looked up this way was
first seen at some point, even if later superseded by a recovery
rotation. Direct lookup in .evts could return escrowed events that
may never have been accepted as first seen.
subkey 'fons.'
dgKey (prefix + digest)
Only one value per DB key is allowed.
.migs is named subDB instance of CesrSuber (klas=Dater) tracking
completed migrations. Maps migration module name to the Dater
timestamp of when it was run.
subkey 'migs.'
Key: migration name str.
Only one value per DB key is allowed.
.vers is named subDB instance of Suber storing the current database
schema version string.
subkey 'vers.'
.esrs is named subDB instance of Komer (schema=EventSourceRecord)
tracking the source of each event. When .local is Truthy the event
was sourced in a protected way (generated locally or via a protected
path). When .local is Falsey the event was NOT sourced in a
protected way. The value of .local determines what validation logic
to run. Used to track source when processing escrows that would
otherwise be decoupled from the original source of the event.
subkey 'esrs.'
dgKey (prefix + digest)
Only one value per DB key is allowed.
.misfits is named subDB instance of OnIoSetSuber for misfit escrows.
Events with remote (nonlocal) sources that are inappropriate (i.e.
would be dropped) unless promoted to local source via extra
after-the-fact authentication. Escrow processing determines if and
how to promote event source to local and then reprocess.
subkey 'mfes.'
snKey (prefix + sequence number)
Value: qb64b digest of event.
.delegables is named subDB instance of IoSetSuber for delegable event
escrows of key events with a local delegator that need approval.
Approval is via anchoring of the delegated event seal in the
delegator's KEL. Event source must be local. A nonlocal (remote)
source must first pass through .misfits and be promoted to local.
subkey 'dees.'
snKey (prefix + sequence number)
Value: qb64b digest of event.
.states is named subDB instance of Komer (schema=KeyStateRecord)
mapping a prefix to its latest key state. Used as read-through
cache backing .kevers to reload Kever instances from persistent
storage.
subkey 'stts.'
Key: identifier prefix.
Only one value per DB key is allowed.
.wits is named subDB instance of CesrIoSetSuber (klas=Prefixer)
storing the current witness set for an identifier.
subkey 'wits.'
Key: identifier prefix.
Multiple values per key (one per witness).
.habs is named subDB instance of Komer (schema=HabitatRecord) mapping
habitat names to habitat application state including identifier
prefix.
subkey 'habs.'
Key: habitat name str.
Only one value per DB key is allowed.
.names is named subDB instance of Suber (sep='^') mapping
(namespace, name) to identifier prefix. Provides namespace-scoped
name lookup for habitats.
subkey 'names.'
Key: namespace + '^' + name.
.sdts is named subDB instance of CesrSuber (klas=Dater) mapping SAD
SAID to Dater CESR serialization of ISO-8601 datetime
(sad date-time stamp).
subkey 'sdts.'
Key: said (bytes) of SAD.
Only one value per DB key is allowed.
.ssgs is named subDB instance of CesrIoSetSuber (klas=Siger) for SAD
transferable indexed signatures. Maps quadruple key
(diger.qb64, prefixer.qb64, number.qb64, diger.qb64) to Siger
of the transferable signer's signature. Diger is the SAID of the
SAD; prefixer, number, and diger indicate the key state
establishment event for the signer.
subkey 'ssgs.'
Key: join(diger.qb64b, prefixer.qb64b, number.qb64b, diger.qb64b)
Multiple values per key (one per signer, insertion ordered).
.scgs is named subDB instance of CatCesrIoSetSuber
(klas=(Verfer, Cigar)) for SAD nontransferable signatures. Maps
SAD SAID to (Verfer, Cigar) couple for each nontransferable signer.
For nontransferable signers, qb64 of Verfer equals Prefixer.
subkey 'scgs.'
Key: said (bytes) of SAD.
Multiple values per key (one per nontransferable signer, insertion
ordered).
.rpys is named subDB instance of SerderSuber for reply messages. Maps
reply SAID to serialization of the reply message (versioned SAD).
Use .sdts, .ssgs, and .scgs for associated datetimes and
signatures.
subkey 'rpys.'
Key: said bytes.
Only one value per DB key is allowed.
.rpes is named subDB instance of CesrIoSetSuber (klas=Diger) for
reply escrows. Maps reply route to Diger of the escrowed reply
message. Routes such as '/end/role' and '/loc/scheme'.
subkey 'rpes.'
Key: route bytes.
Multiple values per key.
.eans is named subDB instance of CesrSuber (klas=Diger) for endpoint
role authorizations. Maps cid.role.eid to SAID of the reply SAD
that authN by controller cid authZ endpoint provider eid in the
given role. Routes /end/role/add and /end/role/cut to nullify.
subkey 'eans.'
Key: cid.role.eid.
Only one value per DB key is allowed.
.lans is named subDB instance of CesrSuber (klas=Diger) for location
authorizations. Maps eid.scheme to SAID of the reply SAD that
authN by endpoint provider eid designates scheme URL.
Route /loc/scheme; null URL nullifies.
subkey 'lans.'
Key: eid.scheme.
Only one value per DB key is allowed.
.ends is named subDB instance of Komer (schema=EndpointRecord) mapping
(cid, role, eid) to EndpointRecord attributes about endpoint
authorization. cid is controller prefix, role is endpoint role
(e.g. watcher), eid is controller prefix of endpoint provider.
Data extracted from reply /end/role/add or /end/role/cut.
subkey 'ends.'
Key: cid.role.eid.
.locs is named subDB instance of Komer (schema=LocationRecord) mapping
endpoint prefix eid and network location scheme to endpoint
location details. Data extracted from reply /loc/scheme.
subkey 'locs.'
Key: eid.scheme.
.obvs is named subDB instance of Komer (schema=ObservedRecord) for
observed OIDs by watcher. Maps (cid, aid, oid) to ObservedRecord.
subkey 'obvs.'
Key: cid.aid.oid.
.tops is named subDB instance of Komer (schema=TopicsRecord) mapping
witness identifier prefix to the topic index of the last recieved
mailbox message.
subkey 'witm.'
Key: witness prefix identifier.
.gpse is named subDB instance of CatCesrIoSetSuber
(klas=(Number, Diger)) for group multisig partial signature
escrows.
subkey 'gpse.'
Multiple values per key.
.gdee is named subDB instance of CatCesrIoSetSuber
(klas=(Number, Diger)) for group multisig delegate escrows.
subkey 'gdee.'
Multiple values per key.
.gpwe is named subDB instance of CatCesrIoSetSuber
(klas=(Number, Diger)) for group multisig partial witness escrows.
subkey 'gdwe.'
Multiple values per key.
.cgms is named subDB instance of CesrSuber (klas=Diger) for completed
group multisig events. Maps key to Diger of completed event.
subkey 'cgms.'
Only one value per DB key is allowed.
.epse is named subDB instance of SerderSuber for exchange message
partial signature escrows. Maps key to serialized Serder of the
escrowed exchange message.
subkey 'epse.'
.epsd is named subDB instance of CesrSuber (klas=Dater) for exchange
message partial signature escrow datetimes. Maps key to Dater
timestamp of the escrowed message.
subkey 'epsd.'
.exns is named subDB instance of SerderSuber for accepted exchange
messages. Maps key to serialized Serder of the exchange message.
subkey 'exns.'
.erpy is named subDB instance of CesrSuber (klas=Saider) as a forward
pointer to a provided reply message associated with an exchange
message.
subkey 'erpy.'
Only one value per DB key is allowed.
.esigs is named subDB instance of CesrIoSetSuber (klas=Siger) for
exchange message transferable indexed signatures.
subkey 'esigs.'
Multiple values per key.
.ecigs is named subDB instance of CatCesrIoSetSuber
(klas=(Verfer, Cigar)) for exchange message nontransferable
signatures. Maps key to (Verfer, Cigar) couples.
subkey 'ecigs.'
Multiple values per key.
.epath is named subDB instance of IoSetSuber for exchange message
pathed attachments.
subkey 'epath.'
Multiple values per key.
.essrs is named subDB instance of CesrIoSetSuber (klas=Texter) for
exchange message event source records.
subkey 'essrs.'
Multiple values per key.
.chas is named subDB instance of CesrIoSetSuber (klas=Diger) for
accepted signed 12-word challenge response exn messages. Keyed by
prefix of signer.
subkey 'chas.'
Multiple values per key.
.reps is named subDB instance of CesrIoSetSuber (klas=Diger) for
successful signed 12-word challenge response exn messages. Keyed
by prefix of signer.
subkey 'reps.'
Multiple values per key.
.wkas is named subDB instance of IoSetKomer (schema=WellKnownAuthN)
for authorized well-known OOBIs.
subkey 'wkas.'
Multiple values per key.
.kdts is named subDB instance of CesrSuber (klas=Dater) mapping key
state SAID to ISO-8601 datetime stamp (ksn date-time stamp).
subkey 'kdts.'
Key: said (bytes).
Only one value per DB key is allowed.
.ksns is named subDB instance of Komer (schema=KeyStateRecord) for
key state notice messages. Maps key state SAID to KeyStateRecord.
Use .kdts for associated datetimes and signatures.
subkey 'ksns.'
.knas is named subDB instance of CesrSuber (klas=Diger) for key state
SAID records of successfully saved key state notices. Maps
(prefix, aid) to SAID of key state.
subkey 'knas.'
Only one value per DB key is allowed.
.wwas is named subDB instance of CesrSuber (klas=Diger) for watcher
watched SAID records. Maps (cid, aid, oid) to SAID of the reply
message for successfully saved watched AIDs.
subkey 'wwas.'
Only one value per DB key is allowed.
.oobis is named subDB instance of Komer (schema=OobiRecord, sep='>')
for configured OOBIs to be processed asynchronously. Keyed by
OOBI URL. sep='>' prevents splitting as '>' is not valid in URLs.
subkey 'oobis.'
.eoobi is named subDB instance of Komer (schema=OobiRecord, sep='>')
for OOBIs that failed to load and are pending retry. Keyed by
OOBI URL.
subkey 'eoobi.'
.coobi is named subDB instance of Komer (schema=OobiRecord, sep='>')
for OOBIs with outstanding client requests. Keyed by OOBI URL.
subkey 'coobi.'
.roobi is named subDB instance of Komer (schema=OobiRecord, sep='>')
for resolved OOBIs that have been successfully processed. Keyed
by OOBI URL.
subkey 'roobi.'
.woobi is named subDB instance of Komer (schema=OobiRecord, sep='>')
for well-known OOBIs used for MFA against a resolved OOBI. Keyed
by OOBI URL.
subkey 'woobi.'
.moobi is named subDB instance of Komer (schema=OobiRecord, sep='>')
for multifactor well-known OOBI records. Keyed by OOBI URL.
subkey 'moobi.'
.mfa is named subDB instance of Komer (schema=OobiRecord, sep='>')
for multifactor well-known OOBI auth records pending processing.
Keyed by controller URL.
subkey 'mfa.'
.rmfa is named subDB instance of Komer (schema=OobiRecord, sep='>')
for resolved multifactor well-known OOBI auth records. Keyed by
controller URL.
subkey 'rmfa.'
.schema is named subDB instance of SchemerSuber storing JSON schema
SADs keyed by SAID of the schema.
subkey 'schema.'
.cfld is named subDB instance of Suber for contact field values for
remote identifiers. Keyed by prefix/field.
subkey 'cfld.'
.hbys is named subDB instance of Suber for global settings of the
Habery environment.
subkey 'hbys.'
.cons is named subDB instance of Suber for signed contact data. Keyed
by prefix.
subkey 'cons.'
.ccigs is named subDB instance of CesrSuber (klas=Cigar) for
transferable signatures on contact data. Keyed by prefix.
subkey 'ccigs.'
Only one value per DB key is allowed.
.imgs is raw LMDB sub database for chunked image data for contact
information of remote identifiers. Keyed by prefix/chunk-index.
subkey b'imgs.'
Raw bytes values; accessed directly via env.open_db.
.ifld is named subDB instance of Suber for identifier field values
for local identifiers. Keyed by prefix/field.
subkey 'ifld.'
.sids is named subDB instance of Suber for signed local identifier
data. Keyed by prefix.
subkey 'sids.'
.icigs is named subDB instance of CesrSuber (klas=Cigar) for
transferable signatures on local identifier data. Keyed by prefix.
subkey 'icigs.'
Only one value per DB key is allowed.
.iimgs is raw LMDB sub database for chunked image data for local
identifier information. Keyed by prefix/chunk-index.
subkey b'iimgs.'
Raw bytes values; accessed directly via env.open_db.
.dpwe is named subDB instance of SerderSuber for delegated partial
witness escrows. Maps key to serialized Serder of the escrowed
delegated event.
subkey 'dpwe.'
.dune is named subDB instance of SerderSuber for delegated unanchored
escrows. Maps key to serialized Serder of the unanchored delegated
event awaiting delegation anchor.
subkey 'dune.'
.dpub is named subDB instance of SerderSuber for delegate publication
escrows used to send delegator info to the delegate's witnesses.
subkey 'dpub.'
.cdel is named subDB instance of CesrOnSuber (klas=Diger) for
completed group delegated AIDs. Maps ordinal key to Diger of the
completed delegation event.
subkey 'cdel.'
.meids is named subDB instance of CesrIoSetSuber (klas=Diger) mapping
multisig embed payload SAID to the SAIDs of the exn messages that
contained it. Aggregates identical message bodies across group
multisig participants reaching consensus on events or credentials.
subkey 'meids.'
Multiple values per key.
.maids is named subDB instance of CesrIoSetSuber (klas=Prefixer)
mapping multisig embed payload SAID to the AIDs of the group
multisig participants that contributed it.
subkey 'maids.'
Multiple values per key.
.kramCTYP is named subDB instance of Komer (schema=CacheTypeRecord) for
KRAM cache type records. Maps expression string to drift and lag
parameters.
subkey 'ctyp.'
.kramMSGC is named subDB instance of Komer (schema=MsgCacheRecord) for
KRAM message cache. Maps (AID, MID) to message datetime, drift,
and lag values.
subkey 'msgc.'
.kramTMSC is named subDB instance of Komer (schema=TxnMsgCacheRecord) for
KRAM transactioned message cache. Maps (AID, XID, MID) to
datetimes, drift, and lag values.
subkey 'tmsc.'
.kramPMKM is named subDB instance of SerderSuber for KRAM partially signed
multi-key messages. Maps (AID, MID) key to the associated
SerderKERI message.
subkey 'pmkm.'
.kramPMKS is named subDB instance of CesrIoSetSuber (klas=Siger) for
KRAM partially signed multi-key signatures. Maps (AID, MID) key
to associated Siger instances.
subkey 'pmks.'
Multiple values per key.
.kramPMSK is named subDB instance of CatCesrSuber (klas=(Number, Diger))
for KRAM partially signed multi-key sender key state records. Maps
(AID, MID) key to (sn, event SAID) couple identifying the sender's
key state.
subkey 'pmsk.'
Only one value per DB key is allowed.
.kramTRQS is named subDB instance of CatCesrIoSetSuber for KRAM partially
signed multi-key trans receipt quadruple attachments.
subkey 'trqs.'
DB is keyed by (AID, MID): sender identifier prefix plus message SAID
Value is (Prefixer, Number, Diger, Siger) tuple. Sourced from
parser kwa key 'trqs'.
Multiple values per key stored as ordered set (duplicates ignored).
Entries persist until removed by the KRAM pruner.
.kramTSGS is named subDB instance of CatCesrIoSetSuber for KRAM partially
signed multi-key trans last sig group attachments. Each group is
stored per-siger as a flat (Prefixer, Seqner, Saider, Siger) tuple.
subkey 'tsgs.'
DB is keyed by (AID, MID): sender identifier prefix plus message SAID
Value is (Prefixer, Number, Diger, Siger) tuple. Sourced from
parser kwa key 'tsgs'.
Multiple values per key stored as ordered set (duplicates ignored).
Entries persist until removed by the KRAM pruner.
.kramSSCS is named subDB instance of CatCesrIoSetSuber for KRAM partially
signed multi-key first seen seal couple attachments from issuing or
delegating events.
subkey 'sscs.'
DB is keyed by (AID, MID): sender identifier prefix plus message SAID
Value is (Number, Diger) tuple. Sourced from parser kwa key 'sscs'.
Multiple values per key stored as ordered set (duplicates ignored).
Entries persist until removed by the KRAM pruner.
.kramSSTS is named subDB instance of CatCesrIoSetSuber for KRAM partially
signed multi-key source seal triple attachments from issued or
delegated events.
subkey 'ssts.'
DB is keyed by (AID, MID): sender identifier prefix plus message SAID
Value is (Prefixer, Number, Diger) tuple. Sourced from parser kwa
key 'ssts'.
Multiple values per key stored as ordered set (duplicates ignored).
Entries persist until removed by the KRAM pruner.
.kramFRCS is named subDB instance of CatCesrIoSetSuber for KRAM partially
signed multi-key first seen replay couple attachments.
subkey 'frcs.'
DB is keyed by (AID, MID): sender identifier prefix plus message SAID
Value is (Number, Dater) tuple. Sourced from parser kwa key 'frcs'.
Multiple values per key stored as ordered set (duplicates ignored).
Entries persist until removed by the KRAM pruner.
.kramTDCS is named subDB instance of CatCesrIoSetSuber for KRAM partially
signed multi-key typed digest seal couple attachments.
subkey 'tdcs.'
DB is keyed by (AID, MID): sender identifier prefix plus message SAID
Value is (Verser, Diger) tuple. Sourced from parser kwa key 'tdcs'.
Multiple values per key stored as ordered set (duplicates ignored).
Entries persist until removed by the KRAM pruner.
.kramPTDS is named subDB instance of IoSetSuber for KRAM partially signed
multi-key pathed stream attachments.
subkey 'ptds.'
DB is keyed by (AID, MID): sender identifier prefix plus message SAID
Value is raw bytes of pathed CESR stream. Sourced from parser kwa
key 'ptds'.
Multiple values per key stored as ordered set (duplicates ignored).
Entries persist until removed by the KRAM pruner.
.kramBSQS is named subDB instance of CatCesrIoSetSuber for KRAM partially
signed multi-key blind state quadruple attachments.
subkey 'bsqs.'
DB is keyed by (AID, MID): sender identifier prefix plus message SAID
Value is (Diger, Noncer, Noncer, Labeler) tuple. Sourced from
parser kwa key 'bsqs'.
Multiple values per key stored as ordered set (duplicates ignored).
Entries persist until removed by the KRAM pruner.
.kramBSSS is named subDB instance of CatCesrIoSetSuber for KRAM partially
signed multi-key bound state sextuple attachments.
subkey 'bsss.'
DB is keyed by (AID, MID): sender identifier prefix plus message SAID
Value is (Diger, Noncer, Noncer, Labeler, Number, Noncer) tuple.
Sourced from parser kwa key 'bsss'.
Multiple values per key stored as ordered set (duplicates ignored).
Entries persist until removed by the KRAM pruner.
.kramTMQS is named subDB instance of CatCesrIoSetSuber for KRAM partially
signed multi-key type media quadruple attachments.
subkey 'tmqs.'
DB is keyed by (AID, MID): sender identifier prefix plus message SAID
Value is (Diger, Noncer, Labeler, Texter) tuple. Sourced from
parser kwa key 'tmqs'.
Multiple values per key stored as ordered set (duplicates ignored).
Entries persist until removed by the KRAM pruner.
Properties:
kevers (statedict): read through cache of kevers of states for KELs in db
"""
[docs]
def __init__(self, headDirPath=None, reopen=False, **kwa):
"""
Setup named sub databases.
Parameters:
name is str directory path name differentiator for main database
When system employs more than one keri database, name allows
differentiating each instance by name
temp is boolean, assign to .temp
True then open in temporary directory, clear on close
Othewise then open persistent directory, do not clear on close
headDirPath is optional str head directory pathname for main database
If not provided use default .HeadDirpath
mode is int numeric os dir permissions for database directory
reopen (bool): True means database will be reopened by this init
"""
self.prefixes = oset() # should change to hids for hab ids
self.groups = oset() # group hab ids
self._kevers = statedict()
self._kevers.db = self # assign db for read through cache of kevers
if (mapSize := os.getenv(KERIBaserMapSizeKey)) is not None:
try:
self.MapSize = int(mapSize)
except ValueError:
logger.error("KERI_BASER_MAP_SIZE must be an integer value >1!")
raise
super(Baser, self).__init__(headDirPath=headDirPath, reopen=reopen, **kwa)
@property
def kevers(self):
"""
Returns .db.kevers
"""
return self._kevers
[docs]
def reopen(self, **kwa):
"""
Open sub databases
Notes:
dupsort=True for sub DB means allow unique (key,pair) duplicates at a key.
Duplicate means that is more than one value at a key but not a redundant
copies a (key,value) pair per key. In other words the pair (key,value)
must be unique both key and value in combination.
Attempting to put the same (key,value) pair a second time does
not add another copy.
Duplicates are inserted in lexocographic order by value, insertion order.
"""
from . import koming, subing
from ..core import coring, indexing
super(Baser, self).reopen(**kwa)
# Create by opening first time named sub DBs within main DB instance
# Names end with "." as sub DB name must include a non Base64 character
# to avoid namespace collisions with Base64 identifier prefixes.
self.evts = subing.SerderSuber(db=self, subkey='evts.')
self.fels = subing.OnSuber(db=self, subkey='fels.')
self.kels = subing.OnIoDupSuber(db=self, subkey='kels.')
self.dtss = subing.CesrSuber(db=self, subkey='dtss.', klas=coring.Dater)
self.aess = subing.CatCesrSuber(db=self, subkey='aess.',
klas=(coring.Number, coring.Diger))
self.sigs = subing.CesrIoSetSuber(db=self, subkey='sigs.',
klas=(indexing.Siger))
self.wigs = subing.CesrIoSetSuber(db=self, subkey='wigs.', klas=indexing.Siger)
self.rcts = subing.CatCesrIoSetSuber(db=self, subkey="rcts.",
klas=(coring.Prefixer, coring.Cigar))
self.ures = subing.CatCesrIoSetSuber(db=self, subkey='ures.',
klas=(coring.Diger, coring.Prefixer, coring.Cigar))
self.vrcs = subing.CatCesrIoSetSuber(db=self, subkey='vrcs.',
klas=(coring.Prefixer, coring.Number, coring.Diger, indexing.Siger))
self.vres = subing.CatCesrIoSetSuber(db=self, subkey='vres.',
klas=(coring.Diger, coring.Prefixer, coring.Number, coring.Diger, indexing.Siger))
self.pses = subing.OnIoDupSuber(db=self, subkey='pses.')
self.pwes = subing.OnIoDupSuber(db=self, subkey='pwes.')
self.pdes = subing.OnIoDupSuber(db=self, subkey='pdes.')
self.udes = subing.CatCesrSuber(db=self, subkey='udes.', klas=(coring.Number, coring.Diger))
self.uwes = subing.B64OnIoSetSuber(db=self, subkey='uwes.')
self.ooes = subing.OnIoDupSuber(db=self, subkey='ooes.')
self.dels = subing.OnIoDupSuber(db=self, subkey='dels.')
self.ldes = subing.OnIoDupSuber(db=self, subkey='ldes.')
self.qnfs = subing.IoSetSuber(db=self, subkey="qnfs.", dupsort=True)
# events as ordered by first seen ordinals
self.fons = subing.CesrSuber(db=self, subkey='fons.', klas=coring.Number)
self.migs = subing.CesrSuber(db=self, subkey="migs.", klas=coring.Dater)
self.vers = subing.Suber(db=self, subkey="vers.")
# event source local (protected) or non-local (remote not protected)
self.esrs = koming.Komer(db=self,
klas=EventSourceRecord,
subkey='esrs.')
# misfit escrows whose processing may change the .esrs event source record
self.misfits = subing.OnIoSetSuber(db=self, subkey='mfes.')
# delegable events escrows. events with local delegator that need approval
self.delegables = subing.IoSetSuber(db=self, subkey='dees.')
# Kever state made of KeyStateRecord key states
# TODO: clean
self.states = koming.Komer(db=self,
klas=KeyStateRecord,
subkey='stts.')
self.wits = subing.CesrIoSetSuber(db=self, subkey="wits.", klas=coring.Prefixer)
# habitat application state keyed by habitat name, includes prefix
self.habs = koming.Komer(db=self,
subkey='habs.',
klas=HabitatRecord, )
# habitat name database mapping (domain,name) as key to Prefixer
self.names = subing.Suber(db=self, subkey='names.', sep="^")
# SAD support datetime stamps and signatures indexed and not-indexed
# all sad sdts (sad datetime serializations) maps said to date-time
self.sdts = subing.CesrSuber(db=self, subkey='sdts.', klas=coring.Dater)
# all sad ssgs (sad indexed signature serializations) maps SAD quadkeys
# given by quadruple (diger.qb64, prefixer.qb64, seqner.q64, diger.qb64)
# of reply and trans signer's key state est evt to val Siger for each
# signature.
self.ssgs = subing.CesrIoSetSuber(db=self, subkey='ssgs.', klas=indexing.Siger)
# all sad scgs (sad non-indexed signature serializations) maps SAD SAID
# to couple (Verfer, Cigar) of nontrans signer of signature in Cigar
# nontrans qb64 of Prefixer is same as Verfer
self.scgs = subing.CatCesrIoSetSuber(db=self, subkey='scgs.',
klas=(coring.Verfer, coring.Cigar))
# all reply messages. Maps reply said to serialization. Replys are
# versioned sads ( with version string) so use Serder to deserialize and
# use .sdts, .ssgs, and .scgs for datetimes and signatures
# TODO: clean
self.rpys = subing.SerderSuber(db=self, subkey='rpys.')
# all reply escrows indices of partially signed reply messages. Maps
# route in reply to single (Diger,) of escrowed reply.
# Routes such as /end/role /loc/schema
self.rpes = subing.CesrIoSetSuber(db=self, subkey='rpes.',
klas=coring.Diger)
# auth AuthN/AuthZ by controller at cid of endpoint provider at eid
# maps key=cid.role.eid to val=diger of end reply
self.eans = subing.CesrSuber(db=self, subkey='eans.', klas=coring.Diger)
# auth AuthN/AuthZ by endpoint provider at eid of location at scheme url
# maps key=cid.role.eid to val=diger of end reply
self.lans = subing.CesrSuber(db=self, subkey='lans.', klas=coring.Diger)
# service endpoint identifier (eid) auths keyed by controller cid.role.eid
# data extracted from reply /end/role/add or /end/role/cut
self.ends = koming.Komer(db=self, subkey='ends.',
klas=EndpointRecord, )
# service endpoint locations keyed by eid.scheme (endpoint identifier)
# data extracted from reply loc
self.locs = koming.Komer(db=self,
subkey='locs.',
klas=LocationRecord, )
# observed oids by watcher by cid.aid.oid (endpoint identifier)
# data extracted from reply loc
self.obvs = koming.Komer(db=self,
subkey='obvs.',
klas=ObservedRecord, )
# index of last retrieved message from witness mailbox
# TODO: clean
self.tops = koming.Komer(db=self,
subkey='witm.',
klas=TopicsRecord, )
# group partial signature escrow
self.gpse = subing.CatCesrIoSetSuber(db=self, subkey='gpse.',
klas=(coring.Number, coring.Diger))
# group delegate escrow
self.gdee = subing.CatCesrIoSetSuber(db=self, subkey='gdee.',
klas=(coring.Number, coring.Diger))
# group partial witness escrow
self.gpwe = subing.CatCesrIoSetSuber(db=self, subkey='gdwe.',
klas=(coring.Number, coring.Diger))
# completed group multisig
# TODO: clean
self.cgms = subing.CesrSuber(db=self, subkey='cgms.',
klas=coring.Diger)
# exchange message partial signature escrow
self.epse = subing.SerderSuber(db=self, subkey="epse.")
# exchange message PS escrow date time of message
self.epsd = subing.CesrSuber(db=self, subkey="epsd.",
klas=coring.Dater)
# exchange messages
# TODO: clean
self.exns = subing.SerderSuber(db=self, subkey="exns.")
# Forward pointer to a provided reply message
# TODO: clean
self.erpy = subing.CesrSuber(db=self, subkey="erpy.", klas=coring.Saider)
# exchange message signatures
# TODO: clean
self.esigs = subing.CesrIoSetSuber(db=self, subkey='esigs.', klas=indexing.Siger)
# exchange message signatures
# TODO: clean
self.ecigs = subing.CatCesrIoSetSuber(db=self, subkey='ecigs.',
klas=(coring.Verfer, coring.Cigar))
# exchange pathed attachments
# TODO: clean
self.epath = subing.IoSetSuber(db=self, subkey="epath.")
self.essrs = subing.CesrIoSetSuber(db=self, subkey="essrs.", klas=coring.Texter)
# accepted signed 12-word challenge response exn messages keys by prefix of signer
# TODO: clean
self.chas = subing.CesrIoSetSuber(db=self, subkey='chas.', klas=coring.Diger)
# successfull signed 12-word challenge response exn messages keys by prefix of signer
# TODO: clean
self.reps = subing.CesrIoSetSuber(db=self, subkey='reps.', klas=coring.Diger)
# authorzied well known OOBIs
# TODO: clean
self.wkas = koming.IoSetKomer(db=self, subkey='wkas.', klas=WellKnownAuthN)
# KSN support datetime stamps and signatures indexed and not-indexed
# all ksn kdts (key state datetime serializations) maps said to date-time
# TODO: clean
self.kdts = subing.CesrSuber(db=self, subkey='kdts.', klas=coring.Dater)
# all key state messages. Maps key state said to serialization. ksns are
# KeyStateRecords so use ._asdict or ._asjson as appropriate
# use .kdts, .ksgs, and .kcgs for datetimes and signatures
# TODO: clean
self.ksns = koming.Komer(db=self,
klas=KeyStateRecord,
subkey='ksns.')
# key state SAID database for successfully saved key state notices
# maps key=(prefix, aid) to val=said of key state
# TODO: clean
self.knas = subing.CesrSuber(db=self, subkey='knas.', klas=coring.Diger)
# Watcher watched SAID database for successfully saved watched AIDs for a watcher
# maps key=(cid, aid, oid) to val=said of rpy message
# TODO: clean
self.wwas = subing.CesrSuber(db=self, subkey='wwas.', klas=coring.Diger)
# config loaded oobis to be processed asynchronously, keyed by oobi URL
# TODO: clean
self.oobis = koming.Komer(db=self,
subkey='oobis.',
klas=OobiRecord,
sep=">") # Use seperator not allowed in URLs so no splitting occurs.
# escrow OOBIs that failed to load, retriable, keyed by oobi URL
self.eoobi = koming.Komer(db=self,
subkey='eoobi.',
klas=OobiRecord,
sep=">") # Use seperator not allowed in URLs so no splitting occurs.
# OOBIs with outstand client requests.
self.coobi = koming.Komer(db=self,
subkey='coobi.',
klas=OobiRecord,
sep=">") # Use seperator not allowed in URLs so no splitting occurs.
# Resolved OOBIs (those that have been processed successfully for this database.
# TODO: clean
self.roobi = koming.Komer(db=self,
subkey='roobi.',
klas=OobiRecord,
sep=">") # Use seperator not allowed in URLs so no splitting occurs.
# Well known OOBIs that are to be used for mfa against a resolved OOBI.
# TODO: clean
self.woobi = koming.Komer(db=self,
subkey='woobi.',
klas=OobiRecord,
sep=">") # Use seperator not allowed in URLs so no splitting occurs.
# Well known OOBIs that are to be used for mfa against a resolved OOBI.
# TODO: clean
self.moobi = koming.Komer(db=self,
subkey='moobi.',
klas=OobiRecord,
sep=">") # Use seperator not allowed in URLs so no splitting occurs.
# Multifactor well known OOBI auth records to process. Keys by controller URL
# TODO: clean
self.mfa = koming.Komer(db=self,
subkey='mfa.',
klas=OobiRecord,
sep=">") # Use seperator not allowed in URLs so no splitting occurs.
# Resolved multifactor well known OOBI auth records. Keys by controller URL
# TODO: clean
self.rmfa = koming.Komer(db=self,
subkey='rmfa.',
klas=OobiRecord,
sep=">") # Use seperator not allowed in URLs so no splitting occurs.
# JSON schema SADs keys by the SAID
# TODO: clean
self.schema = subing.SchemerSuber(db=self,
subkey='schema.')
# Field values for contact information for remote identifiers. Keyed by prefix/field
# TODO: clean
self.cfld = subing.Suber(db=self,
subkey="cfld.")
# Global settings for the Habery environment
self.hbys = subing.Suber(db=self, subkey='hbys.')
# Signed contact data, keys by prefix
# TODO: clean
self.cons = subing.Suber(db=self,
subkey="cons.")
# Transferable signatures on contact data
# TODO: clean
self.ccigs = subing.CesrSuber(db=self, subkey='ccigs.', klas=coring.Cigar)
# Blinded media for contact information for remote identifiers.
# CatCesrSuber with TypeMedia format: (Noncer=SAID, Noncer=UUID, Labeler=MIME, Texter=data)
self.imgs = subing.CatCesrSuber(db=self, subkey='imgs.',
klas=(coring.Noncer, coring.Noncer,
coring.Labeler, coring.Texter))
# Field values for identifier information for local identifiers. Keyed by prefix/field
# TODO: clean
self.ifld = subing.Suber(db=self,
subkey="ifld.")
# Signed identifier data, keys by prefix
# TODO: clean
self.sids = subing.Suber(db=self,
subkey="sids.")
# Transferable signatures on identifier data
# TODO: clean
self.icigs = subing.CesrSuber(db=self, subkey='icigs.', klas=coring.Cigar)
# Blinded media for identifier information for local identifiers.
# CatCesrSuber with TypeMedia format: (Noncer=SAID, Noncer=UUID, Labeler=MIME, Texter=data)
self.iimgs = subing.CatCesrSuber(db=self, subkey='iimgs.',
klas=(coring.Noncer, coring.Noncer,
coring.Labeler, coring.Texter))
# Delegation escrow dbs #
# delegated partial witness escrow
self.dpwe = subing.SerderSuber(db=self, subkey='dpwe.')
# delegated unanchored escrow
self.dune = subing.SerderSuber(db=self, subkey='dune.')
# delegate publication escrow for sending delegator info to my witnesses
self.dpub = subing.SerderSuber(db=self, subkey='dpub.')
# completed group delegated AIDs
# TODO: clean
self.cdel = subing.CesrOnSuber(db=self, subkey='cdel.',
klas=coring.Diger)
# multisig sig embed payload SAID mapped to containing exn messages across group multisig participants
# TODO: clean
self.meids = subing.CesrIoSetSuber(db=self, subkey="meids.", klas=coring.Diger)
# multisig sig embed payload SAID mapped to group multisig participants AIDs
# TODO: clean
self.maids = subing.CesrIoSetSuber(db=self, subkey="maids.", klas=coring.Prefixer)
# KRAM cache type — key: expression string, value: drift and lag params
self.kramCTYP = koming.Komer(db=self, subkey='ctyp.',
klas=CacheTypeRecord)
# KRAM message cache — key: (AID, MID), value: msg datetime, drift, lags
self.kramMSGC = koming.Komer(db=self, subkey='msgc.',
klas=MsgCacheRecord)
# KRAM transactioned message cache — key: (AID, XID, MID), value: datetimes, drift, lags
self.kramTMSC = koming.Komer(db=self, subkey='tmsc.',
klas=TxnMsgCacheRecord)
# KRAM partially signed multi-key message key (AID.MID) mapped to associated message (SerderKERI)
self.kramPMKM = subing.SerderSuber(db=self, subkey='pmkm.')
# KRAM partially signed multi-key signature key (AID.MID) mapped to associated signatures
self.kramPMKS = subing.CesrIoSetSuber(db=self, subkey='pmks.', klas=indexing.Siger)
# KRAM partially signed multi-key sender key state key (AID.MID) mapped to SN and event SAID
self.kramPMSK = subing.CatCesrSuber(db=self, subkey='pmsk.', klas=(coring.Number, coring.Diger))
# KRAM partially signed multi-key non-authenticator attachments
# trqs: trans receipt quadruples (prefixer, number, diger, siger)
self.kramTRQS = subing.CatCesrIoSetSuber(db=self, subkey='trqs.',
klas=(coring.Prefixer, coring.Number,
coring.Diger, indexing.Siger))
# tsgs: trans last sig groups (prefixer, number, diger, siger) — stored per-siger
self.kramTSGS = subing.CatCesrIoSetSuber(db=self, subkey='tsgs.',
klas=(coring.Prefixer, coring.Number,
coring.Diger, indexing.Siger))
# sscs: first seen seal couples (number, diger) issuing or delegating
self.kramSSCS = subing.CatCesrIoSetSuber(db=self, subkey='sscs.',
klas=(coring.Number, coring.Diger))
# ssts: source seal triples (prefixer, number, diger) issued or delegated
self.kramSSTS = subing.CatCesrIoSetSuber(db=self, subkey='ssts.',
klas=(coring.Prefixer, coring.Number,
coring.Diger))
# frcs: first seen replay couples (number, dater)
self.kramFRCS = subing.CatCesrIoSetSuber(db=self, subkey='frcs.',
klas=(coring.Number, coring.Dater))
# tdcs: typed digest seal couples (verser, diger)
self.kramTDCS = subing.CatCesrIoSetSuber(db=self, subkey='tdcs.',
klas=(coring.Verser, coring.Diger))
# ptds: pathed streams (raw bytes)
self.kramPTDS = subing.IoSetSuber(db=self, subkey='ptds.')
# bsqs: blind state quadruples (diger, noncer, noncer, labeler)
self.kramBSQS = subing.CatCesrIoSetSuber(db=self, subkey='bsqs.',
klas=(coring.Diger, coring.Noncer,
coring.Noncer, coring.Labeler))
# bsss: bound state sextuples (diger, noncer, noncer, labeler, number, noncer)
self.kramBSSS = subing.CatCesrIoSetSuber(db=self, subkey='bsss.',
klas=(coring.Diger, coring.Noncer,
coring.Noncer, coring.Labeler,
coring.Number, coring.Noncer))
# tmqs: type media quadruples (diger, noncer, labeler, texter)
self.kramTMQS = subing.CatCesrIoSetSuber(db=self, subkey='tmqs.',
klas=(coring.Diger, coring.Noncer,
coring.Labeler, coring.Texter))
self.reload()
return self.env
[docs]
def reload(self):
"""
Reload stored prefixes and Kevers from .habs
"""
# Check migrations to see if this database is up to date. Error otherwise
if not self.current:
raise DatabaseError(f"Database migrations must be run. DB version {self.version}; current {__version__}")
removes = []
for keys, data in self.habs.getTopItemIter():
if (ksr := self.states.get(keys=data.hid)) is not None:
try:
from ..core.eventing import Kever
kever = Kever(state=ksr,
db=self,
local=True)
except MissingEntryError as ex: # no kel event for keystate
removes.append(keys) # remove from .habs
continue
self.kevers[kever.prefixer.qb64] = kever
self.prefixes.add(kever.prefixer.qb64)
if data.mid: # group hab
self.groups.add(data.hid)
elif data.mid is None: # in .habs but no corresponding key state and not a group so remove
removes.append(keys) # no key state or KEL event for .hab record
for keys in removes: # remove bare .habs records
self.habs.rem(keys=keys)
[docs]
def migrate(self):
""" Run all migrations required
Run all migrations that are required from the current version of database up to the current version
of the software that have not already been run.
Sets the version of the database to the current version of the software after successful completion
of required migrations
"""
from ..core import coring
escrows_cleared = False
for (version, migrations) in MIGRATIONS:
# Only run migration if current source code version is at or below the migration version
ver = semver.VersionInfo.parse(__version__)
ver_no_prerelease = semver.Version(ver.major, ver.minor, ver.patch)
if self.version is not None and semver.compare(version, str(ver_no_prerelease)) > 0:
print(
f"Skipping migration {version} as higher than the current KERI version {__version__}")
continue
# Skip migrations already run - where version less than (-1) or equal to (0) database version
# Strip prerelease from DB version to avoid lexicographic comparison bugs (#820)
if self.version is not None and semver.compare(version, _strip_prerelease(self.version)) != 1:
continue
# Clear all escrows before first migration to prevent old key
# format crashes (e.g. qnfs keys without insertion-order suffix).
# Uses .trim() which bypasses key parsing. See #863.
if not escrows_cleared:
self._trimAllEscrows()
escrows_cleared = True
print(f"Migrating database v{self.version} --> v{version}")
for migration in migrations:
modName = f"keri.db.migrations.{migration}"
if self.migs.get(keys=(migration,)) is not None:
continue
mod = importlib.import_module(modName)
try:
print(f"running migration {modName}")
mod.migrate(self)
except Exception as e:
print(f"\nAbandoning migration {migration} at version {version} with error: {e}")
return
self.migs.pin(keys=(migration,), val=coring.Dater())
# update database version after successful migration
self.version = version
self.version = __version__
def _trimAllEscrows(self):
"""Trim all escrow databases via low-level .trim().
Safe for old key formats that would crash higher-level iterators
(e.g., qnfs keys without insertion-order suffix from pre-1.2.0).
Called at the beginning of migration per spec call guidance.
See: https://github.com/WebOfTrust/keripy/issues/863
"""
escrows = [
self.ures, self.vres, self.pses, self.pwes, self.ooes,
self.qnfs, self.uwes, self.misfits, self.delegables,
self.pdes, self.udes, self.rpes, self.ldes, self.epsd,
self.eoobi, self.dpub, self.gpwe, self.gdee, self.dpwe,
self.gpse, self.epse, self.dune,
]
total = 0
for escrow in escrows:
count = escrow.cnt()
if count > 0:
escrow.trim()
total += count
if total > 0:
print(f"Cleared {total} escrow entries before migration")
[docs]
def clearEscrows(self):
"""
Clear all escrows
"""
for escrow in [self.ures, self.vres, self.pses, self.pwes, self.ooes,
self.qnfs, self.uwes,
self.qnfs, self.misfits, self.delegables, self.pdes,
self.udes, self.rpes, self.ldes, self.epsd, self.eoobi,
self.dpub, self.gpwe, self.gdee, self.dpwe, self.gpse,
self.epse, self.dune]:
count = escrow.cntAll()
escrow.trim()
logger.info(f"KEL: Cleared {count} escrows from ({escrow}")
@property
def current(self):
""" Current property determines if we are at the current database migration state.
If the database version matches the library version return True
If the current database version is behind the current library version, check for migrations
- If there are migrations to run, return False
- If there are no migrations to run, reset database version to library version and return True
If the current database version is ahead of the current library version, raise exception
"""
if self.version == __version__:
return True
ver = semver.VersionInfo.parse(__version__)
ver_no_prerelease = semver.Version(ver.major, ver.minor, ver.patch)
# Strip prerelease from DB version to avoid lexicographic comparison bugs (#820)
if self.version is not None and semver.compare(_strip_prerelease(self.version), str(ver_no_prerelease)) == 1:
raise ConfigurationError(
f"Database version={self.version} is ahead of library version={__version__}")
last = MIGRATIONS[-1]
# If we aren't at latest version, but there are no outstanding migrations,
# reset version to latest (rightmost (-1) migration is latest)
if self.migs.get(keys=(last[1][-1],)) is not None:
return True
# We have migrations to run
return False
[docs]
def complete(self, name=None):
""" Returns list of tuples of migrations completed with date of completion
Parameters:
name(str): optional name of migration to check completeness
Returns:
list: tuples of migration,date of completed migration names and the date of completion
"""
migrations = []
if not name:
for version, migs in MIGRATIONS:
# Print entries only for migrations that have been run
# Strip prerelease from DB version to avoid lexicographic comparison bugs (#820)
if self.version is not None and semver.compare(version, _strip_prerelease(self.version)) <= 0:
for mig in migs:
dater = self.migs.get(keys=(mig,))
migrations.append((mig, dater))
else:
for version, migs in MIGRATIONS: # check all migrations for each version
if name not in migs or not self.migs.get(keys=(name,)):
raise ValueError(f"No migration named {name}")
migrations.append((name, self.migs.get(keys=(name,))))
return migrations
[docs]
def clean(self):
"""
Clean database by creating re-verified cleaned cloned copy
and then replacing original with cleaned cloned copy
Database usage should be offline during cleaning as it will be cloned in
readonly mode
"""
from ..core import parsing
# create copy to clone into
with openDB(name=self.name,
temp=False,
headDirPath=self.headDirPath,
perm=self.perm,
clean=True) as copy: # copy is Baser instance
with reopenDB(db=self, reuse=True, readonly=True): # reopen as readonly
if not os.path.exists(self.path):
raise ValueError("Error while cleaning, no orig at {}."
"".format(self.path))
from ..core.eventing import Kevery
kvy = Kevery(db=copy) # promiscuous mode
# Revise in future to NOT parse msgs but to extract the processed
# objects so can pass directly to kvy.processEvent()
# need new method cloneObjAllPreIter()
# process event doesn't capture exceptions so we can more easily
# detect in the cloning that some events did not make it through
psr = parsing.Parser(kvy=kvy, version=Vrsn_1_0)
for msg in self.cloneAllPreIter(): # clone into copy
psr.parseOne(ims=msg)
# This is the list of non-set based databases that are not created as part of event processing.
# for now we are just copying them from self to copy without worrying about being able to
# reprocess them. We need a more secure method in the future
unsecured = ["hbys", "schema", "states", "rpys", "eans", "tops", "cgms", "exns", "erpy",
"kdts", "ksns", "knas", "oobis", "roobi", "woobi", "moobi", "mfa", "rmfa",
"cfld", "cons", "ccigs", "cdel", "migs",
"ifld", "sids", "icigs"]
for name in unsecured:
srcdb = getattr(self, name)
cpydb = getattr(copy, name)
for keys, val in srcdb.getTopItemIter():
cpydb.put(keys=keys, val=val)
# This is the list of set based databases that are not created as part of event processing.
# for now we are just copying them from self to copy without worrying about being able to
# reprocess them. We need a more secure method in the future
sets = ["esigs", "ecigs", "epath", "chas", "reps", "wkas", "meids", "maids"]
for name in sets:
srcdb = getattr(self, name)
cpydb = getattr(copy, name)
for keys, val in srcdb.getTopItemIter():
cpydb.add(keys=keys, val=val)
# Copy imgs (blinded media for remote identifiers)
for keys, val in self.imgs.getTopItemIter():
copy.imgs.pin(keys=keys, val=val)
# Copy iimgs (blinded media for local identifiers)
for keys, val in self.iimgs.getTopItemIter():
copy.iimgs.pin(keys=keys, val=val)
# clone .habs habitat name prefix Komer subdb
# copy.habs = koming.Komer(db=copy, schema=HabitatRecord, subkey='habs.') # copy
for keys, val in self.habs.getTopItemIter():
if val.hid in copy.kevers: # only copy habs that verified
copy.habs.put(keys=keys, val=val)
ns = "" if val.domain is None else val.domain
copy.names.put(keys=(ns, val.name), val=val.hid)
copy.prefixes.add(val.hid)
if val.mid: # a group hab
copy.groups.add(val.hid)
# clone .ends and .locs databases
for (cid, role, eid), val in self.ends.getTopItemIter():
exists = False # only copy if entries in both .ends and .locs
for scheme in ("https", "http", "tcp"): # all supported schemes
lval = self.locs.get(keys=(eid, scheme))
if lval:
exists = True # loc with matching cid and rol
copy.locs.put(keys=(eid, scheme), val=lval)
if exists: # only copy end if has at least one matching loc
copy.ends.put(keys=(cid, role, eid), val=val)
# replace own kevers with copy kevers by clear and copy
# future do this by loading kever from .stts key state subdb
self.kevers.clear()
for pre, kever in copy.kevers.items():
self.kevers[pre] = kever
# replace prefixes with cloned copy prefixes
# clear and clone .prefixes
self.prefixes.clear()
self.prefixes.update(copy.prefixes)
# clear and clone .gids
self.groups.clear()
self.groups.update(copy.groups)
# remove own db directory replace with clean clone copy
if os.path.exists(self.path):
shutil.rmtree(self.path)
dst = shutil.move(copy.path, self.path) # move copy back to orig
if os.path.exists(os.path.join(os.path.sep, "usr", "local", "var", "keri", "clean")):
shutil.rmtree(os.path.join(os.path.sep, "usr", "local", "var", "keri", "clean"))
if not dst: # move failed leave new in place so can manually fix
raise ValueError("Error cloning, unable to move {} to {}."
"".format(copy.path, self.path))
with reopenDB(db=self, reuse=True): # make sure can reopen
if not isinstance(self.env, lmdb.Environment):
raise ValueError("Error cloning, unable to reopen."
"".format(self.path))
# clone success so remove if still there
if os.path.exists(copy.path):
shutil.rmtree(copy.path)
[docs]
def clonePreIter(self, pre, fn=0):
"""
Returns iterator of first seen event messages with attachments for the
identifier prefix pre starting at first seen order number, fn.
Essentially a replay in first seen order with attachments
Parameters:
pre is bytes of itdentifier prefix
fn is int fn to resume replay. Earliset is fn=0
Returns:
msgs (Iterator): over all items with pre starting at fn
"""
if hasattr(pre, 'encode'):
pre = pre.encode("utf-8")
for keys, fn, dig in self.fels.getAllItemIter(keys=pre, on=fn):
try:
msg = self.cloneEvtMsg(pre=pre, fn=fn, dig=dig)
except Exception:
continue # skip this event
yield msg
[docs]
def cloneAllPreIter(self):
"""
Returns iterator of first seen event messages with attachments for all
identifier prefixes starting at key. If key == b'' then start at first
key in databse. Use key to resume replay.
Essentially a replay in first seen order with attachments of entire
set of FELs.
Returns:
msgs (Iterator): over all items in db
"""
for keys, fn, dig in self.fels.getAllItemIter(keys=b'', on=0):
pre = keys[0].encode() if isinstance(keys[0], str) else keys[0]
try:
msg = self.cloneEvtMsg(pre=pre, fn=fn, dig=dig)
except Exception:
continue # skip this event
yield msg
[docs]
def cloneEvtMsg(self, pre, fn, dig):
"""
Clones Event as Serialized CESR Message with Body and attached Foot
Parameters:
pre (bytes): identifier prefix of event
fn (int): first seen number (ordinal) of event
dig (bytes): digest of event
Returns:
bytearray: message body with attachments
"""
from ..core import coring
from ..core.counting import Counter, Codens
msg = bytearray() # message
atc = bytearray() # attachments
dgkey = dgKey(pre, dig) # get message
if not (serder := self.evts.get(keys=(pre, dig))):
raise MissingEntryError("Missing event for dig={}.".format(dig))
msg.extend(serder.raw)
# add indexed signatures to attachments
if not (sigers := self.sigs.get(keys=dgkey)):
raise MissingEntryError("Missing sigs for dig={}.".format(dig))
atc.extend(Counter(code=Codens.ControllerIdxSigs,
count=len(sigers), version=Vrsn_1_0).qb64b)
for siger in sigers:
atc.extend(siger.qb64b)
# add indexed witness signatures to attachments
if wigers := self.wigs.get(keys=dgkey):
atc.extend(Counter(code=Codens.WitnessIdxSigs,
count=len(wigers), version=Vrsn_1_0).qb64b)
for wiger in wigers:
atc.extend(wiger.qb64b)
# add authorizer (delegator/issuer) source seal event couple to attachments
if (duple := self.aess.get(keys=(pre, dig))) is not None:
number, diger = duple
atc.extend(Counter(code=Codens.SealSourceCouples,
count=1, version=Vrsn_1_0).qb64b)
atc.extend(number.qb64b + diger.qb64b)
# add trans endorsement quadruples to attachments not controller
# may have been originally key event attachments or receipted endorsements
if quads := self.vrcs.get(keys=dgkey):
atc.extend(Counter(code=Codens.TransReceiptQuadruples,
count=len(quads), version=Vrsn_1_0).qb64b)
for pre, snu, diger, siger in quads: # adapt to CESR
atc.extend(pre.qb64b)
atc.extend(snu.qb64b)
atc.extend(diger.qb64b)
atc.extend(siger.qb64b)
# add nontrans endorsement couples to attachments not witnesses
# may have been originally key event attachments or receipted endorsements
if coups := self.rcts.get(keys=dgkey):
atc.extend(Counter(code=Codens.NonTransReceiptCouples,
count=len(coups), version=Vrsn_1_0).qb64b)
for prefixer, cigar in coups:
atc.extend(prefixer.qb64b)
atc.extend(cigar.qb64b)
# add first seen replay couple to attachments
if not (dater := self.dtss.get(keys=dgkey)):
raise MissingEntryError("Missing datetime for dig={}.".format(dig))
atc.extend(Counter(code=Codens.FirstSeenReplayCouples,
count=1, version=Vrsn_1_0).qb64b)
atc.extend(coring.Number(num=fn, code=coring.NumDex.Huge).qb64b) # may not need to be Huge
atc.extend(dater.qb64b)
# prepend pipelining counter to attachments
if len(atc) % 4:
raise ValueError("Invalid attachments size={}, nonintegral"
" quadlets.".format(len(atc)))
pcnt = Counter(code=Codens.AttachmentGroup,
count=(len(atc) // 4), version=Vrsn_1_0).qb64b
msg.extend(pcnt)
msg.extend(atc)
return msg
[docs]
def cloneDelegation(self, kever):
"""
Recursively clone delegation chain from AID of Kever if one exits.
Parameters:
kever (Kever): Kever from which to clone the delegator's AID.
"""
if kever.delegated and kever.delpre in self.kevers:
dkever = self.kevers[kever.delpre]
yield from self.cloneDelegation(dkever)
for dmsg in self.clonePreIter(pre=kever.delpre, fn=0):
yield dmsg
[docs]
def fetchAllSealingEventByEventSeal(self, pre, seal, sn=0):
"""
Search through a KEL for the event that contains a specific anchored
SealEvent type of provided seal but in dict form and is also fully
witnessed. Searchs from sn forward (default = 0).Searches all events in
KEL of pre including disputed and/or superseded events.
Returns the Serder of the first event with the anchored SealEvent seal,
None if not found
Parameters:
pre (bytes|str): identifier of the KEL to search
seal (dict): dict form of Seal of any type SealEvent to find in anchored
seals list of each event
sn (int): beginning sn to search
"""
from ..core.structing import SealEvent
if tuple(seal) != SealEvent._fields: # wrong type of seal
return None
seal = SealEvent(**seal) #convert to namedtuple
for srdr in self.getEvtPreIter(pre=pre, sn=sn): # includes disputed & superseded
for eseal in srdr.seals or []: # or [] for seals 'a' field missing
if tuple(eseal) == SealEvent._fields:
eseal = SealEvent(**eseal) # convert to namedtuple
if seal == eseal and self.fullyWitnessed(srdr):
return srdr
return None
# use alias here until can change everywhere for backwards compatibility
findAnchoringSealEvent = fetchAllSealingEventByEventSeal # alias
[docs]
def fetchLastSealingEventByEventSeal(self, pre, seal, sn=0):
"""
Search through a KEL for the last event at any sn but that contains a
specific anchored event seal of namedtuple SealEvent type that matches
the provided seal in dict form and is also fully witnessed.
Searchs from provided sn forward (default = 0).
Searches only last events in KEL of pre so does not include disputed
and/or superseded events.
Returns:
srdr (Serder): instance of the first event with the matching
anchoring SealEvent seal,
None if not found
Parameters:
pre (bytes|str): identifier of the KEL to search
seal (dict): dict form of Seal of any type SealEvent to find in anchored
seals list of each event
sn (int): beginning sn to search
"""
from ..core.structing import SealEvent
if tuple(seal) != SealEvent._fields: # wrong type of seal
return None
seal = SealEvent(**seal) #convert to namedtuple
for srdr in self.getEvtLastPreIter(pre=pre, sn=sn): # no disputed or superseded
for eseal in srdr.seals or []: # or [] for seals 'a' field missing
if tuple(eseal) == SealEvent._fields:
eseal = SealEvent(**eseal) # convert to namedtuple
if seal == eseal and self.fullyWitnessed(srdr):
return srdr
return None
[docs]
def fetchLastSealingEventBySeal(self, pre, seal, sn=0):
"""Only searches last event at any sn therefore does not search
any disputed or superseded events.
Search through last event at each sn in KEL for the event that contains
an anchored Seal with same Seal type as provided seal but in dict form.
Searchs from sn forward (default = 0).
Returns the Serder of the first found event with the anchored Seal seal,
None if not found
Parameters:
pre (bytes|str): identifier of the KEL to search
seal (dict): dict form of Seal of any type to find in anchored
seals list of each event
sn (int): beginning sn to search
"""
# create generic Seal namedtuple class using keys from provided seal dict
Seal = namedtuple('Seal', list(seal)) # matching type
for srdr in self.getEvtLastPreIter(pre=pre, sn=sn): # only last evt at sn
for eseal in srdr.seals or []: # or [] for seals 'a' field missing
if tuple(eseal) == Seal._fields: # same type of seal
eseal = Seal(**eseal) #convert to namedtuple
if seal == eseal and self.fullyWitnessed(srdr):
return srdr
return None
[docs]
def signingMembers(self, pre: str):
""" Find signing members of a multisig group aid.
Using the pubs index to find members of a signing group
Parameters:
pre (str): qb64 identifier prefix to find members
Returns:
list: qb64 identifier prefixes of signing members for provided aid
"""
if (habord := self.habs.get(keys=(pre,))) is None:
return None
return habord.smids
[docs]
def rotationMembers(self, pre: str):
""" Find rotation members of a multisig group aid.
Using the digs index to lookup member pres of a group aid
Parameters:
pre (str): qb64 identifier prefix to find members
Returns:
list: qb64 identifier prefixes of rotation members for provided aid
"""
if (habord := self.habs.get(keys=(pre,))) is None:
return None
return habord.rmids
[docs]
def fullyWitnessed(self, serder):
""" Verify the witness threshold on the event
Parameters:
serder (Serder): event serder to validate witness threshold
Returns:
"""
# Verify fully receipted, because this witness may have persisted before all receipts
# have been gathered if this ius a witness for serder.pre
# get unique verified wigers and windices lists from wigers list
wigers = self.wigs.get(keys=(serder.preb, serder.saidb))
kever = self.kevers[serder.pre]
toad = kever.toader.num
return not len(wigers) < toad
[docs]
def resolveVerifiers(self, pre=None, sn=0, dig=None):
"""
Returns the Tholder and Verfers for the provided identifier prefix.
Default pre is own .pre
Parameters:
pre(str) is qb64 str of bytes of identifier prefix.
sn(int) is the sequence number of the est event
dig(str) is qb64 str of digest of est event
"""
from ..core import coring
prefixer = coring.Prefixer(qb64=pre)
if prefixer.transferable:
# receipted event and receipter in database so get receipter est evt
# retrieve dig of last event at sn of est evt of receipter.
sdig = self.kels.getLast(keys=prefixer.qb64b, on=sn)
if sdig is None:
# receipter's est event not yet in receipters's KEL
raise ValidationError("key event sn {} for pre {} is not yet in KEL"
"".format(sn, pre))
sdig = sdig.encode("utf-8")
# retrieve last event itself of receipter est evt from sdig
sserder = self.evts.get(keys=(prefixer.qb64b, bytes(sdig)))
# assumes db ensures that sserder must not be none because sdig was in KE
if dig is not None and not sserder.compare(said=dig): # endorser's dig not match event
raise ValidationError("Bad proof sig group at sn = {}"
" for ksn = {}."
"".format(sn, sserder.sad))
verfers = sserder.verfers
tholder = sserder.tholder
else:
verfers = [coring.Verfer(qb64=pre)]
tholder = coring.Tholder(sith="1")
return tholder, verfers
[docs]
def getEvtPreIter(self, pre, sn=0):
"""
Returns iterator of event messages without attachments
in sn order from the KEL of identifier prefix pre.
Essentially a replay of all event messages without attachments
for each sn from the KEL of pre including superseded duplicates
Parameters:
pre (bytes|str): identifier prefix
sn (int): sequence number (default 0) to begin interation
"""
if hasattr(pre, 'encode'):
pre = pre.encode("utf-8")
for dig in self.kels.getAllIter(keys=pre, on=sn):
try:
if not (serder := self.evts.get(keys=(pre, dig))):
raise MissingEntryError("Missing event for dig={}.".format(dig))
except Exception:
continue # skip this event
yield serder # event as Serder
[docs]
def getEvtLastPreIter(self, pre, sn=0):
"""
Returns iterator of event messages without attachments
in sn order from the KEL of identifier prefix pre.
Essentially a replay of all event messages without attachments
for each sn from the KEL of pre including superseded duplicates
Parameters:
pre (bytes|str): identifier prefix
sn (int): sequence number (default 0) to begin interation
"""
if hasattr(pre, 'encode'):
pre = pre.encode("utf-8")
for dig in self.kels.getLastIter(keys=pre, on=sn):
try:
if not (serder := self.evts.get(keys=(pre, dig) )):
raise MissingEntryError("Missing event for dig={}.".format(dig))
except Exception:
continue # skip this event
yield serder # event as Serder
[docs]
class BaserDoer(doing.Doer):
"""
Basic Baser Doer ( LMDB Database )
Attributes: (inherited)
done (bool): completion state:
True means completed
Otherwise incomplete. Incompletion maybe due to close or abort.
Attributes:
.baser is Baser or LMDBer subclass
Properties: (inherited)
.tyme is float relative cycle time of associated Tymist .tyme obtained
via injected .tymth function wrapper closure.
.tymth is function wrapper closure returned by Tymist .tymeth() method.
When .tymth is called it returns associated Tymist .tyme.
.tymth provides injected dependency on Tymist tyme base.
.tock is float, desired time in seconds between runs or until next run,
non negative, zero means run asap
Properties:
Methods:
.wind injects ._tymth dependency from associated Tymist to get its .tyme
.__call__ makes instance callable
Appears as generator function that returns generator
.do is generator method that returns generator
.enter is enter context action method
.recur is recur context action method or generator method
.exit is exit context method
.close is close context method
.abort is abort context method
Hidden:
._tymth is injected function wrapper closure returned by .tymen() of
associated Tymist instance that returns Tymist .tyme. when called.
._tock is hidden attribute for .tock property
"""
[docs]
def __init__(self, baser, **kwa):
"""
Inherited Parameters:
tymist is Tymist instance
tock is float seconds initial value of .tock
Parameters:
baser is Baser instance
"""
super(BaserDoer, self).__init__(**kwa)
self.baser = baser
def enter(self, *, temp=None):
""""""
if not self.baser.opened:
self.baser.reopen() # reopen(temp=temp)
def exit(self):
""""""
self.baser.close(clear=self.baser.temp)