Source code for keri.db.basing

# -*- encoding: utf-8 -*-
"""
KERI
keri.db.basing module
"""
import importlib
import os
import shutil
from collections import namedtuple
from contextlib import contextmanager
import lmdb
import semver
from ordered_set import OrderedSet as oset

from hio.base import doing
from hio.help import ogler

from keri import __version__
from .dbing import LMDBer, dgKey, openLMDB
from ..kering import (MissingEntryError, DatabaseError,
                      ConfigurationError, ValidationError,
                      Vrsn_1_0, Vrsn_2_0)
from ..recording import (KeyStateRecord, EventSourceRecord,
                         HabitatRecord, TopicsRecord,
                         OobiRecord, EndpointRecord,
                         LocationRecord, ObservedRecord,
                         CacheTypeRecord, TxnMsgCacheRecord,
                         MsgCacheRecord, WellKnownAuthN)


logger = ogler.getLogger()


def _strip_prerelease(version_str):
    """Strip prerelease and build metadata from a semver string.

    Semver compares alphanumeric prerelease identifiers lexicographically,
    so 'dev4' > 'dev10' (because '4' > '1'). Stripping prerelease ensures
    dev releases within the same version cycle compare as equal.
    See: https://github.com/WebOfTrust/keripy/issues/820
    """
    ver = semver.VersionInfo.parse(version_str)
    return str(semver.Version(ver.major, ver.minor, ver.patch))


MIGRATIONS = [
    ("0.6.8", ["hab_data_rename"]),
    ("1.0.0", ["add_key_and_reg_state_schemas"]),
    ("1.2.0", ["rekey_habs"])
]


# ToDo XXXX maybe
'''
class komerdict(dict):
    """
    Subclass of dict that has db as attribute and employs read through cache
    from db Baser.stts of kever states to reload kever from state in database
    when not found in memory as dict item.

    add method that answers is a given pre a group hab pre .localGroup(pre)

    Todo add wrapper decorator to update attributes
    on class that injects instance attributes when class is instanced
    one of the injected parameters is function that that maps returned Komer to
    object class
    parameters are subdb (must be Komer) and function that maps retrieved dataclass
    record  from dataabase to class instance. if no mapping function then just
    return the dataclass record as value.
    """

'''


[docs] class statedict(dict): """ Subclass of dict that has db as attribute and employs read through cache from db Baser.stts of kever states to reload kever from state in database when not found in memory as dict item. """ __slots__ = ('db') # no .__dict__ just for db reference def __init__(self, *pa, **kwa): super(statedict, self).__init__(*pa, **kwa) self.db = None def __getitem__(self, k): try: return super(statedict, self).__getitem__(k) except KeyError as ex: if not self.db: raise ex # reraise KeyError if (ksr := self.db.states.get(keys=k)) is None: raise ex # reraise KeyError try: from ..core.eventing import Kever kever = Kever(state=ksr, db=self.db) except MissingEntryError: # no kel event for keystate raise ex # reraise KeyError self.__setitem__(k, kever) return kever def __contains__(self, k): if not super(statedict, self).__contains__(k): try: self.__getitem__(k) return True except KeyError: return False else: return True
[docs] def get(self, k, default=None): """Override of dict get method Parameters: k (str): key for dict default: default value to return if not found Returns: kever: converted from underlying dict or database """ if not super(statedict, self).__contains__(k): return default else: return self.__getitem__(k)
[docs] def openDB(*, cls=None, name="test", **kwa): """ Returns contextmanager generated by openLMDB but with Baser instance as default """ if cls == None: # can't reference class before its defined below cls = Baser return openLMDB(cls=cls, name=name, **kwa)
[docs] @contextmanager def reopenDB(db, clear=False, **kwa): """ Context manager wrapper LMDB DB instances. Repens and closes db.path and db.env LMDB Parameters: db (LMDBer): instance with LMDB environment at .env clear (bool): True means clear directory after close Usage: with reopenDB(baser) as env: env. .... """ try: db.reopen(clear=clear, **kwa) yield db.env finally: db.close(clear=clear)
KERIBaserMapSizeKey = "KERI_BASER_MAP_SIZE"
[docs] class Baser(LMDBer): """ Baser sets up named sub databases with Keri Event Logs within main database Attributes: see superclass LMDBer for inherited attributes kevers (dbdict): read-through cache of Kever instances indexed by identifier prefix qb64 prefixes (OrderedSet): local prefixes corresponding to habitats for this db groups (OrderedSet): group hab identifier prefixes for this db .evts is named subDB instance of SerderSuber whose values are serialized key events subkey 'evts.' dgKey (prefix + digest) DB is keyed by identifier prefix plus digest of serialized event Only one value per DB key is allowed .fels is named subDB instance of OnSuber for first seen event logs (FEL) as indices mapping first-seen ordinal fn to event digests. Actual serialized key events are stored in .evts by SAID digest Indexed in first-seen accepted order for replay and cloning. subkey 'fels.' Key: identifier prefix + monotonically increasing fn. Value: qb64 str of event digest used to lookup event in .evts. Only one value per DB key is allowed. Append-only ordering of accepted first-seen events. .kels is named subDB instance of OnIoDupSuber for key event logs as indices mapping composite key "<pre><sep><on>" to serialized key event digests. Actual serialized key events are stored in .evts by SAID digest. subkey 'kels.' Key: identifier prefix + sequence number. Value: qb64 digest used to lookup event in .evts. More than one value per DB key is allowed. .dtss is named subDB instance of CesrSuber (klas=Dater) for datetime stamps of when the event was first escrowed and then later first seen by log. Used for escrow timeouts and extended validation. subkey 'dtss.' dgKey (prefix + digest) Value: Dater instance Only one value per DB key is allowed. .aess is named subDB instance of CatCesrSuber (klas=(Number, Diger)) for authorizing event source seal couples that map digest of key event to seal source couple of authorizer's (delegator or issuer) event. subkey 'aess.' dgKey (prefix + digest) Value: (Number, Diger) tuple; Number serialized as Huge (fixed 24-char), used to lookup authorizer's source event in .kels. Only one value per DB key is allowed. .sigs is named subDB instance of CesrIoSetSuber (klas=Siger) for fully qualified indexed event signatures from the controller. subkey 'sigs.' dgKey (prefix + digest) More than one value per DB key is allowed. .wigs is named subDB instance of CesrIoSetSuber (klas=Siger) for indexed witness signatures of events that may come directly or be derived from a witness receipt message. Witnesses always have nontransferable identifier prefixes. The index is the offset of the witness into the witness list of the most recent establishment event wrt the receipted event. subkey 'wigs.' dgKey (prefix + digest) More than one value per DB key is allowed. .rcts is named subDB instance of CatCesrIoSetSuber (klas=(Prefixer, Cigar)) for event receipt couplets from nontransferable signers. These are endorsements from nontransferable signers who are not witnesses May be watchers or other Each entry is a (Prefixer, Cigar) duple. subkey 'rcts.' dgKey (prefix + digest) Multiple values per key stored as ordered set (duplicates ignored, insertion order preserved). .ures is named subDB instance of CatCesrIoSetSuber (klas=(Diger, Prefixer, Cigar)) for unverified event receipt escrowed triples from nontransferable signers. Each triple is (receipted event digest, receiptor prefix, receipt signature). Used to escrow receipt couples until the receipted event appears. subkey 'ures.' snKey (prefix + sequence number) More than one value per DB key is allowed. .vrcs is named subDB instance of CatCesrIoSetSuber (klas=(Prefixer, Number, Diger, Siger)) for verified transferable- validator receipt quadruples. Each stored value is a typed CESR tuple (Prefixer, Number, Diger, Siger) representing a validator's AID, its latest establishment-event sequence number, digest, and its indexed signature over the event. Values preserved in insertion order. Represents fully validated receipts moved out of escrow. subkey 'vrcs.' dgKey (prefix + digest) Multiple values per key stored as ordered set. .vres is named subDB instance of CatCesrIoSetSuber for escrowed transferable-receipt quintuples. Each value is a typed CESR tuple (Diger, Prefixer, Number, Diger, Siger) representing a validator's receipt escrow entry. Holds unverified transferable receipts until validated and moved into .vrcs. subkey 'vres.' snKey (prefix + sequence number) Values stored in insertion order. .pses is named subDB instance of OnIoDupSuber for partially-signed event escrows under composite keys "<pre><sep><on>". Tracks events with at least one verified signature but not yet fully validated due to missing signatures or dependent events. subkey 'pses.' Key: identifier prefix + sequence number. Values stored in insertion order. .pwes is named subDB instance of OnIoDupSuber for partially witnessed key event escrows under composite keys "<pre><sep><on>" to serialized event digest. Escrows events with verified signatures but not yet verified witness receipts. subkey 'pwes.' Key: identifier prefix + sequence number. More than one value per DB key is allowed. .pdes is named subDB instance of OnIoDupSuber for partially delegated key event escrows that map prefix + sequence number to serialized event digest. Used in conjunction with .udes which escrows the associated seal source couple. subkey 'pdes.' snKey (prefix + sequence number) More than one value per DB key is allowed. .udes is named subDB instance of CatCesrSuber (klas=(Number, Diger)) for unverified delegation seal source couple escrows that map (prefix, digest) of delegated event to delegating seal source couple (sn, dig) that provides the source delegator event seal. Each couple is (Number(num=sn).qb64b, Diger.qb64b) used to lookup the source event in the delegator's KEL. Once accepted, entries move into .aess. subkey 'udes.' dgKey (prefix + digest) Only one value per DB key is allowed. .uwes is named subDB instance of B64OnIoSetSuber for unverified event indexed escrowed couples from witness signers. Each couple is (edig, wig) where edig is receipted event digest and wig is the indexed witness signature derived from the witness nontrans prefix and offset into the witness list of the latest establishment event. subkey 'uwes.' Key: receipted event controller prefix + sequence number. Multiple values per key are stored in insertion order as a set. .ooes is named subDB instance of OnIoDupSuber for out-of-order event escrows under composite keys "<pre><sep><on>". Tracks events whose prior event has not yet been accepted into the KEL. subkey 'ooes.' Key: identifier prefix + sequence number. Values stored in insertion order. .dels is named subDB instance of OnIoDupSuber for duplicitous event log tables that map identifier prefix plus sequence number to serialized event digests. subkey 'dels.' snKey (prefix + sequence number) Values are qb64 digests used to lookup event in .evts. More than one value per DB key is allowed (insertion ordered). .ldes is named subDB instance of OnIoDupSuber for likely duplicitous escrowed event tables that map identifier prefix plus sequence number to serialized event digests. subkey 'ldes.' snKey (prefix + sequence number) Values are qb64 digests used to lookup event in .evts. More than one value per DB key is allowed (insertion ordered). .qnfs is named subDB instance of IoSetSuber for queued not-first-seen event escrows. Maps (prefix, said) to event digest. subkey 'qnfs.' dupsort=True More than one value per DB key is allowed. .fons is named subDB instance of CesrSuber (klas=Number) mapping prefix and digest to fn value (first seen ordinal number) of the associated event. Given pre and event digest, retrieve fn here then fetch event from .fels. Ensures any event looked up this way was first seen at some point, even if later superseded by a recovery rotation. Direct lookup in .evts could return escrowed events that may never have been accepted as first seen. subkey 'fons.' dgKey (prefix + digest) Only one value per DB key is allowed. .migs is named subDB instance of CesrSuber (klas=Dater) tracking completed migrations. Maps migration module name to the Dater timestamp of when it was run. subkey 'migs.' Key: migration name str. Only one value per DB key is allowed. .vers is named subDB instance of Suber storing the current database schema version string. subkey 'vers.' .esrs is named subDB instance of Komer (schema=EventSourceRecord) tracking the source of each event. When .local is Truthy the event was sourced in a protected way (generated locally or via a protected path). When .local is Falsey the event was NOT sourced in a protected way. The value of .local determines what validation logic to run. Used to track source when processing escrows that would otherwise be decoupled from the original source of the event. subkey 'esrs.' dgKey (prefix + digest) Only one value per DB key is allowed. .misfits is named subDB instance of OnIoSetSuber for misfit escrows. Events with remote (nonlocal) sources that are inappropriate (i.e. would be dropped) unless promoted to local source via extra after-the-fact authentication. Escrow processing determines if and how to promote event source to local and then reprocess. subkey 'mfes.' snKey (prefix + sequence number) Value: qb64b digest of event. .delegables is named subDB instance of IoSetSuber for delegable event escrows of key events with a local delegator that need approval. Approval is via anchoring of the delegated event seal in the delegator's KEL. Event source must be local. A nonlocal (remote) source must first pass through .misfits and be promoted to local. subkey 'dees.' snKey (prefix + sequence number) Value: qb64b digest of event. .states is named subDB instance of Komer (schema=KeyStateRecord) mapping a prefix to its latest key state. Used as read-through cache backing .kevers to reload Kever instances from persistent storage. subkey 'stts.' Key: identifier prefix. Only one value per DB key is allowed. .wits is named subDB instance of CesrIoSetSuber (klas=Prefixer) storing the current witness set for an identifier. subkey 'wits.' Key: identifier prefix. Multiple values per key (one per witness). .habs is named subDB instance of Komer (schema=HabitatRecord) mapping habitat names to habitat application state including identifier prefix. subkey 'habs.' Key: habitat name str. Only one value per DB key is allowed. .names is named subDB instance of Suber (sep='^') mapping (namespace, name) to identifier prefix. Provides namespace-scoped name lookup for habitats. subkey 'names.' Key: namespace + '^' + name. .sdts is named subDB instance of CesrSuber (klas=Dater) mapping SAD SAID to Dater CESR serialization of ISO-8601 datetime (sad date-time stamp). subkey 'sdts.' Key: said (bytes) of SAD. Only one value per DB key is allowed. .ssgs is named subDB instance of CesrIoSetSuber (klas=Siger) for SAD transferable indexed signatures. Maps quadruple key (diger.qb64, prefixer.qb64, number.qb64, diger.qb64) to Siger of the transferable signer's signature. Diger is the SAID of the SAD; prefixer, number, and diger indicate the key state establishment event for the signer. subkey 'ssgs.' Key: join(diger.qb64b, prefixer.qb64b, number.qb64b, diger.qb64b) Multiple values per key (one per signer, insertion ordered). .scgs is named subDB instance of CatCesrIoSetSuber (klas=(Verfer, Cigar)) for SAD nontransferable signatures. Maps SAD SAID to (Verfer, Cigar) couple for each nontransferable signer. For nontransferable signers, qb64 of Verfer equals Prefixer. subkey 'scgs.' Key: said (bytes) of SAD. Multiple values per key (one per nontransferable signer, insertion ordered). .rpys is named subDB instance of SerderSuber for reply messages. Maps reply SAID to serialization of the reply message (versioned SAD). Use .sdts, .ssgs, and .scgs for associated datetimes and signatures. subkey 'rpys.' Key: said bytes. Only one value per DB key is allowed. .rpes is named subDB instance of CesrIoSetSuber (klas=Diger) for reply escrows. Maps reply route to Diger of the escrowed reply message. Routes such as '/end/role' and '/loc/scheme'. subkey 'rpes.' Key: route bytes. Multiple values per key. .eans is named subDB instance of CesrSuber (klas=Diger) for endpoint role authorizations. Maps cid.role.eid to SAID of the reply SAD that authN by controller cid authZ endpoint provider eid in the given role. Routes /end/role/add and /end/role/cut to nullify. subkey 'eans.' Key: cid.role.eid. Only one value per DB key is allowed. .lans is named subDB instance of CesrSuber (klas=Diger) for location authorizations. Maps eid.scheme to SAID of the reply SAD that authN by endpoint provider eid designates scheme URL. Route /loc/scheme; null URL nullifies. subkey 'lans.' Key: eid.scheme. Only one value per DB key is allowed. .ends is named subDB instance of Komer (schema=EndpointRecord) mapping (cid, role, eid) to EndpointRecord attributes about endpoint authorization. cid is controller prefix, role is endpoint role (e.g. watcher), eid is controller prefix of endpoint provider. Data extracted from reply /end/role/add or /end/role/cut. subkey 'ends.' Key: cid.role.eid. .locs is named subDB instance of Komer (schema=LocationRecord) mapping endpoint prefix eid and network location scheme to endpoint location details. Data extracted from reply /loc/scheme. subkey 'locs.' Key: eid.scheme. .obvs is named subDB instance of Komer (schema=ObservedRecord) for observed OIDs by watcher. Maps (cid, aid, oid) to ObservedRecord. subkey 'obvs.' Key: cid.aid.oid. .tops is named subDB instance of Komer (schema=TopicsRecord) mapping witness identifier prefix to the topic index of the last recieved mailbox message. subkey 'witm.' Key: witness prefix identifier. .gpse is named subDB instance of CatCesrIoSetSuber (klas=(Number, Diger)) for group multisig partial signature escrows. subkey 'gpse.' Multiple values per key. .gdee is named subDB instance of CatCesrIoSetSuber (klas=(Number, Diger)) for group multisig delegate escrows. subkey 'gdee.' Multiple values per key. .gpwe is named subDB instance of CatCesrIoSetSuber (klas=(Number, Diger)) for group multisig partial witness escrows. subkey 'gdwe.' Multiple values per key. .cgms is named subDB instance of CesrSuber (klas=Diger) for completed group multisig events. Maps key to Diger of completed event. subkey 'cgms.' Only one value per DB key is allowed. .epse is named subDB instance of SerderSuber for exchange message partial signature escrows. Maps key to serialized Serder of the escrowed exchange message. subkey 'epse.' .epsd is named subDB instance of CesrSuber (klas=Dater) for exchange message partial signature escrow datetimes. Maps key to Dater timestamp of the escrowed message. subkey 'epsd.' .exns is named subDB instance of SerderSuber for accepted exchange messages. Maps key to serialized Serder of the exchange message. subkey 'exns.' .erpy is named subDB instance of CesrSuber (klas=Saider) as a forward pointer to a provided reply message associated with an exchange message. subkey 'erpy.' Only one value per DB key is allowed. .esigs is named subDB instance of CesrIoSetSuber (klas=Siger) for exchange message transferable indexed signatures. subkey 'esigs.' Multiple values per key. .ecigs is named subDB instance of CatCesrIoSetSuber (klas=(Verfer, Cigar)) for exchange message nontransferable signatures. Maps key to (Verfer, Cigar) couples. subkey 'ecigs.' Multiple values per key. .epath is named subDB instance of IoSetSuber for exchange message pathed attachments. subkey 'epath.' Multiple values per key. .essrs is named subDB instance of CesrIoSetSuber (klas=Texter) for exchange message event source records. subkey 'essrs.' Multiple values per key. .chas is named subDB instance of CesrIoSetSuber (klas=Diger) for accepted signed 12-word challenge response exn messages. Keyed by prefix of signer. subkey 'chas.' Multiple values per key. .reps is named subDB instance of CesrIoSetSuber (klas=Diger) for successful signed 12-word challenge response exn messages. Keyed by prefix of signer. subkey 'reps.' Multiple values per key. .wkas is named subDB instance of IoSetKomer (schema=WellKnownAuthN) for authorized well-known OOBIs. subkey 'wkas.' Multiple values per key. .kdts is named subDB instance of CesrSuber (klas=Dater) mapping key state SAID to ISO-8601 datetime stamp (ksn date-time stamp). subkey 'kdts.' Key: said (bytes). Only one value per DB key is allowed. .ksns is named subDB instance of Komer (schema=KeyStateRecord) for key state notice messages. Maps key state SAID to KeyStateRecord. Use .kdts for associated datetimes and signatures. subkey 'ksns.' .knas is named subDB instance of CesrSuber (klas=Diger) for key state SAID records of successfully saved key state notices. Maps (prefix, aid) to SAID of key state. subkey 'knas.' Only one value per DB key is allowed. .wwas is named subDB instance of CesrSuber (klas=Diger) for watcher watched SAID records. Maps (cid, aid, oid) to SAID of the reply message for successfully saved watched AIDs. subkey 'wwas.' Only one value per DB key is allowed. .oobis is named subDB instance of Komer (schema=OobiRecord, sep='>') for configured OOBIs to be processed asynchronously. Keyed by OOBI URL. sep='>' prevents splitting as '>' is not valid in URLs. subkey 'oobis.' .eoobi is named subDB instance of Komer (schema=OobiRecord, sep='>') for OOBIs that failed to load and are pending retry. Keyed by OOBI URL. subkey 'eoobi.' .coobi is named subDB instance of Komer (schema=OobiRecord, sep='>') for OOBIs with outstanding client requests. Keyed by OOBI URL. subkey 'coobi.' .roobi is named subDB instance of Komer (schema=OobiRecord, sep='>') for resolved OOBIs that have been successfully processed. Keyed by OOBI URL. subkey 'roobi.' .woobi is named subDB instance of Komer (schema=OobiRecord, sep='>') for well-known OOBIs used for MFA against a resolved OOBI. Keyed by OOBI URL. subkey 'woobi.' .moobi is named subDB instance of Komer (schema=OobiRecord, sep='>') for multifactor well-known OOBI records. Keyed by OOBI URL. subkey 'moobi.' .mfa is named subDB instance of Komer (schema=OobiRecord, sep='>') for multifactor well-known OOBI auth records pending processing. Keyed by controller URL. subkey 'mfa.' .rmfa is named subDB instance of Komer (schema=OobiRecord, sep='>') for resolved multifactor well-known OOBI auth records. Keyed by controller URL. subkey 'rmfa.' .schema is named subDB instance of SchemerSuber storing JSON schema SADs keyed by SAID of the schema. subkey 'schema.' .cfld is named subDB instance of Suber for contact field values for remote identifiers. Keyed by prefix/field. subkey 'cfld.' .hbys is named subDB instance of Suber for global settings of the Habery environment. subkey 'hbys.' .cons is named subDB instance of Suber for signed contact data. Keyed by prefix. subkey 'cons.' .ccigs is named subDB instance of CesrSuber (klas=Cigar) for transferable signatures on contact data. Keyed by prefix. subkey 'ccigs.' Only one value per DB key is allowed. .imgs is raw LMDB sub database for chunked image data for contact information of remote identifiers. Keyed by prefix/chunk-index. subkey b'imgs.' Raw bytes values; accessed directly via env.open_db. .ifld is named subDB instance of Suber for identifier field values for local identifiers. Keyed by prefix/field. subkey 'ifld.' .sids is named subDB instance of Suber for signed local identifier data. Keyed by prefix. subkey 'sids.' .icigs is named subDB instance of CesrSuber (klas=Cigar) for transferable signatures on local identifier data. Keyed by prefix. subkey 'icigs.' Only one value per DB key is allowed. .iimgs is raw LMDB sub database for chunked image data for local identifier information. Keyed by prefix/chunk-index. subkey b'iimgs.' Raw bytes values; accessed directly via env.open_db. .dpwe is named subDB instance of SerderSuber for delegated partial witness escrows. Maps key to serialized Serder of the escrowed delegated event. subkey 'dpwe.' .dune is named subDB instance of SerderSuber for delegated unanchored escrows. Maps key to serialized Serder of the unanchored delegated event awaiting delegation anchor. subkey 'dune.' .dpub is named subDB instance of SerderSuber for delegate publication escrows used to send delegator info to the delegate's witnesses. subkey 'dpub.' .cdel is named subDB instance of CesrOnSuber (klas=Diger) for completed group delegated AIDs. Maps ordinal key to Diger of the completed delegation event. subkey 'cdel.' .meids is named subDB instance of CesrIoSetSuber (klas=Diger) mapping multisig embed payload SAID to the SAIDs of the exn messages that contained it. Aggregates identical message bodies across group multisig participants reaching consensus on events or credentials. subkey 'meids.' Multiple values per key. .maids is named subDB instance of CesrIoSetSuber (klas=Prefixer) mapping multisig embed payload SAID to the AIDs of the group multisig participants that contributed it. subkey 'maids.' Multiple values per key. .kramCTYP is named subDB instance of Komer (schema=CacheTypeRecord) for KRAM cache type records. Maps expression string to drift and lag parameters. subkey 'ctyp.' .kramMSGC is named subDB instance of Komer (schema=MsgCacheRecord) for KRAM message cache. Maps (AID, MID) to message datetime, drift, and lag values. subkey 'msgc.' .kramTMSC is named subDB instance of Komer (schema=TxnMsgCacheRecord) for KRAM transactioned message cache. Maps (AID, XID, MID) to datetimes, drift, and lag values. subkey 'tmsc.' .kramPMKM is named subDB instance of SerderSuber for KRAM partially signed multi-key messages. Maps (AID, MID) key to the associated SerderKERI message. subkey 'pmkm.' .kramPMKS is named subDB instance of CesrIoSetSuber (klas=Siger) for KRAM partially signed multi-key signatures. Maps (AID, MID) key to associated Siger instances. subkey 'pmks.' Multiple values per key. .kramPMSK is named subDB instance of CatCesrSuber (klas=(Number, Diger)) for KRAM partially signed multi-key sender key state records. Maps (AID, MID) key to (sn, event SAID) couple identifying the sender's key state. subkey 'pmsk.' Only one value per DB key is allowed. .kramTRQS is named subDB instance of CatCesrIoSetSuber for KRAM partially signed multi-key trans receipt quadruple attachments. subkey 'trqs.' DB is keyed by (AID, MID): sender identifier prefix plus message SAID Value is (Prefixer, Number, Diger, Siger) tuple. Sourced from parser kwa key 'trqs'. Multiple values per key stored as ordered set (duplicates ignored). Entries persist until removed by the KRAM pruner. .kramTSGS is named subDB instance of CatCesrIoSetSuber for KRAM partially signed multi-key trans last sig group attachments. Each group is stored per-siger as a flat (Prefixer, Seqner, Saider, Siger) tuple. subkey 'tsgs.' DB is keyed by (AID, MID): sender identifier prefix plus message SAID Value is (Prefixer, Number, Diger, Siger) tuple. Sourced from parser kwa key 'tsgs'. Multiple values per key stored as ordered set (duplicates ignored). Entries persist until removed by the KRAM pruner. .kramSSCS is named subDB instance of CatCesrIoSetSuber for KRAM partially signed multi-key first seen seal couple attachments from issuing or delegating events. subkey 'sscs.' DB is keyed by (AID, MID): sender identifier prefix plus message SAID Value is (Number, Diger) tuple. Sourced from parser kwa key 'sscs'. Multiple values per key stored as ordered set (duplicates ignored). Entries persist until removed by the KRAM pruner. .kramSSTS is named subDB instance of CatCesrIoSetSuber for KRAM partially signed multi-key source seal triple attachments from issued or delegated events. subkey 'ssts.' DB is keyed by (AID, MID): sender identifier prefix plus message SAID Value is (Prefixer, Number, Diger) tuple. Sourced from parser kwa key 'ssts'. Multiple values per key stored as ordered set (duplicates ignored). Entries persist until removed by the KRAM pruner. .kramFRCS is named subDB instance of CatCesrIoSetSuber for KRAM partially signed multi-key first seen replay couple attachments. subkey 'frcs.' DB is keyed by (AID, MID): sender identifier prefix plus message SAID Value is (Number, Dater) tuple. Sourced from parser kwa key 'frcs'. Multiple values per key stored as ordered set (duplicates ignored). Entries persist until removed by the KRAM pruner. .kramTDCS is named subDB instance of CatCesrIoSetSuber for KRAM partially signed multi-key typed digest seal couple attachments. subkey 'tdcs.' DB is keyed by (AID, MID): sender identifier prefix plus message SAID Value is (Verser, Diger) tuple. Sourced from parser kwa key 'tdcs'. Multiple values per key stored as ordered set (duplicates ignored). Entries persist until removed by the KRAM pruner. .kramPTDS is named subDB instance of IoSetSuber for KRAM partially signed multi-key pathed stream attachments. subkey 'ptds.' DB is keyed by (AID, MID): sender identifier prefix plus message SAID Value is raw bytes of pathed CESR stream. Sourced from parser kwa key 'ptds'. Multiple values per key stored as ordered set (duplicates ignored). Entries persist until removed by the KRAM pruner. .kramBSQS is named subDB instance of CatCesrIoSetSuber for KRAM partially signed multi-key blind state quadruple attachments. subkey 'bsqs.' DB is keyed by (AID, MID): sender identifier prefix plus message SAID Value is (Diger, Noncer, Noncer, Labeler) tuple. Sourced from parser kwa key 'bsqs'. Multiple values per key stored as ordered set (duplicates ignored). Entries persist until removed by the KRAM pruner. .kramBSSS is named subDB instance of CatCesrIoSetSuber for KRAM partially signed multi-key bound state sextuple attachments. subkey 'bsss.' DB is keyed by (AID, MID): sender identifier prefix plus message SAID Value is (Diger, Noncer, Noncer, Labeler, Number, Noncer) tuple. Sourced from parser kwa key 'bsss'. Multiple values per key stored as ordered set (duplicates ignored). Entries persist until removed by the KRAM pruner. .kramTMQS is named subDB instance of CatCesrIoSetSuber for KRAM partially signed multi-key type media quadruple attachments. subkey 'tmqs.' DB is keyed by (AID, MID): sender identifier prefix plus message SAID Value is (Diger, Noncer, Labeler, Texter) tuple. Sourced from parser kwa key 'tmqs'. Multiple values per key stored as ordered set (duplicates ignored). Entries persist until removed by the KRAM pruner. Properties: kevers (statedict): read through cache of kevers of states for KELs in db """
[docs] def __init__(self, headDirPath=None, reopen=False, **kwa): """ Setup named sub databases. Parameters: name is str directory path name differentiator for main database When system employs more than one keri database, name allows differentiating each instance by name temp is boolean, assign to .temp True then open in temporary directory, clear on close Othewise then open persistent directory, do not clear on close headDirPath is optional str head directory pathname for main database If not provided use default .HeadDirpath mode is int numeric os dir permissions for database directory reopen (bool): True means database will be reopened by this init """ self.prefixes = oset() # should change to hids for hab ids self.groups = oset() # group hab ids self._kevers = statedict() self._kevers.db = self # assign db for read through cache of kevers if (mapSize := os.getenv(KERIBaserMapSizeKey)) is not None: try: self.MapSize = int(mapSize) except ValueError: logger.error("KERI_BASER_MAP_SIZE must be an integer value >1!") raise super(Baser, self).__init__(headDirPath=headDirPath, reopen=reopen, **kwa)
@property def kevers(self): """ Returns .db.kevers """ return self._kevers
[docs] def reopen(self, **kwa): """ Open sub databases Notes: dupsort=True for sub DB means allow unique (key,pair) duplicates at a key. Duplicate means that is more than one value at a key but not a redundant copies a (key,value) pair per key. In other words the pair (key,value) must be unique both key and value in combination. Attempting to put the same (key,value) pair a second time does not add another copy. Duplicates are inserted in lexocographic order by value, insertion order. """ from . import koming, subing from ..core import coring, indexing super(Baser, self).reopen(**kwa) # Create by opening first time named sub DBs within main DB instance # Names end with "." as sub DB name must include a non Base64 character # to avoid namespace collisions with Base64 identifier prefixes. self.evts = subing.SerderSuber(db=self, subkey='evts.') self.fels = subing.OnSuber(db=self, subkey='fels.') self.kels = subing.OnIoDupSuber(db=self, subkey='kels.') self.dtss = subing.CesrSuber(db=self, subkey='dtss.', klas=coring.Dater) self.aess = subing.CatCesrSuber(db=self, subkey='aess.', klas=(coring.Number, coring.Diger)) self.sigs = subing.CesrIoSetSuber(db=self, subkey='sigs.', klas=(indexing.Siger)) self.wigs = subing.CesrIoSetSuber(db=self, subkey='wigs.', klas=indexing.Siger) self.rcts = subing.CatCesrIoSetSuber(db=self, subkey="rcts.", klas=(coring.Prefixer, coring.Cigar)) self.ures = subing.CatCesrIoSetSuber(db=self, subkey='ures.', klas=(coring.Diger, coring.Prefixer, coring.Cigar)) self.vrcs = subing.CatCesrIoSetSuber(db=self, subkey='vrcs.', klas=(coring.Prefixer, coring.Number, coring.Diger, indexing.Siger)) self.vres = subing.CatCesrIoSetSuber(db=self, subkey='vres.', klas=(coring.Diger, coring.Prefixer, coring.Number, coring.Diger, indexing.Siger)) self.pses = subing.OnIoDupSuber(db=self, subkey='pses.') self.pwes = subing.OnIoDupSuber(db=self, subkey='pwes.') self.pdes = subing.OnIoDupSuber(db=self, subkey='pdes.') self.udes = subing.CatCesrSuber(db=self, subkey='udes.', klas=(coring.Number, coring.Diger)) self.uwes = subing.B64OnIoSetSuber(db=self, subkey='uwes.') self.ooes = subing.OnIoDupSuber(db=self, subkey='ooes.') self.dels = subing.OnIoDupSuber(db=self, subkey='dels.') self.ldes = subing.OnIoDupSuber(db=self, subkey='ldes.') self.qnfs = subing.IoSetSuber(db=self, subkey="qnfs.", dupsort=True) # events as ordered by first seen ordinals self.fons = subing.CesrSuber(db=self, subkey='fons.', klas=coring.Number) self.migs = subing.CesrSuber(db=self, subkey="migs.", klas=coring.Dater) self.vers = subing.Suber(db=self, subkey="vers.") # event source local (protected) or non-local (remote not protected) self.esrs = koming.Komer(db=self, klas=EventSourceRecord, subkey='esrs.') # misfit escrows whose processing may change the .esrs event source record self.misfits = subing.OnIoSetSuber(db=self, subkey='mfes.') # delegable events escrows. events with local delegator that need approval self.delegables = subing.IoSetSuber(db=self, subkey='dees.') # Kever state made of KeyStateRecord key states # TODO: clean self.states = koming.Komer(db=self, klas=KeyStateRecord, subkey='stts.') self.wits = subing.CesrIoSetSuber(db=self, subkey="wits.", klas=coring.Prefixer) # habitat application state keyed by habitat name, includes prefix self.habs = koming.Komer(db=self, subkey='habs.', klas=HabitatRecord, ) # habitat name database mapping (domain,name) as key to Prefixer self.names = subing.Suber(db=self, subkey='names.', sep="^") # SAD support datetime stamps and signatures indexed and not-indexed # all sad sdts (sad datetime serializations) maps said to date-time self.sdts = subing.CesrSuber(db=self, subkey='sdts.', klas=coring.Dater) # all sad ssgs (sad indexed signature serializations) maps SAD quadkeys # given by quadruple (diger.qb64, prefixer.qb64, seqner.q64, diger.qb64) # of reply and trans signer's key state est evt to val Siger for each # signature. self.ssgs = subing.CesrIoSetSuber(db=self, subkey='ssgs.', klas=indexing.Siger) # all sad scgs (sad non-indexed signature serializations) maps SAD SAID # to couple (Verfer, Cigar) of nontrans signer of signature in Cigar # nontrans qb64 of Prefixer is same as Verfer self.scgs = subing.CatCesrIoSetSuber(db=self, subkey='scgs.', klas=(coring.Verfer, coring.Cigar)) # all reply messages. Maps reply said to serialization. Replys are # versioned sads ( with version string) so use Serder to deserialize and # use .sdts, .ssgs, and .scgs for datetimes and signatures # TODO: clean self.rpys = subing.SerderSuber(db=self, subkey='rpys.') # all reply escrows indices of partially signed reply messages. Maps # route in reply to single (Diger,) of escrowed reply. # Routes such as /end/role /loc/schema self.rpes = subing.CesrIoSetSuber(db=self, subkey='rpes.', klas=coring.Diger) # auth AuthN/AuthZ by controller at cid of endpoint provider at eid # maps key=cid.role.eid to val=diger of end reply self.eans = subing.CesrSuber(db=self, subkey='eans.', klas=coring.Diger) # auth AuthN/AuthZ by endpoint provider at eid of location at scheme url # maps key=cid.role.eid to val=diger of end reply self.lans = subing.CesrSuber(db=self, subkey='lans.', klas=coring.Diger) # service endpoint identifier (eid) auths keyed by controller cid.role.eid # data extracted from reply /end/role/add or /end/role/cut self.ends = koming.Komer(db=self, subkey='ends.', klas=EndpointRecord, ) # service endpoint locations keyed by eid.scheme (endpoint identifier) # data extracted from reply loc self.locs = koming.Komer(db=self, subkey='locs.', klas=LocationRecord, ) # observed oids by watcher by cid.aid.oid (endpoint identifier) # data extracted from reply loc self.obvs = koming.Komer(db=self, subkey='obvs.', klas=ObservedRecord, ) # index of last retrieved message from witness mailbox # TODO: clean self.tops = koming.Komer(db=self, subkey='witm.', klas=TopicsRecord, ) # group partial signature escrow self.gpse = subing.CatCesrIoSetSuber(db=self, subkey='gpse.', klas=(coring.Number, coring.Diger)) # group delegate escrow self.gdee = subing.CatCesrIoSetSuber(db=self, subkey='gdee.', klas=(coring.Number, coring.Diger)) # group partial witness escrow self.gpwe = subing.CatCesrIoSetSuber(db=self, subkey='gdwe.', klas=(coring.Number, coring.Diger)) # completed group multisig # TODO: clean self.cgms = subing.CesrSuber(db=self, subkey='cgms.', klas=coring.Diger) # exchange message partial signature escrow self.epse = subing.SerderSuber(db=self, subkey="epse.") # exchange message PS escrow date time of message self.epsd = subing.CesrSuber(db=self, subkey="epsd.", klas=coring.Dater) # exchange messages # TODO: clean self.exns = subing.SerderSuber(db=self, subkey="exns.") # Forward pointer to a provided reply message # TODO: clean self.erpy = subing.CesrSuber(db=self, subkey="erpy.", klas=coring.Saider) # exchange message signatures # TODO: clean self.esigs = subing.CesrIoSetSuber(db=self, subkey='esigs.', klas=indexing.Siger) # exchange message signatures # TODO: clean self.ecigs = subing.CatCesrIoSetSuber(db=self, subkey='ecigs.', klas=(coring.Verfer, coring.Cigar)) # exchange pathed attachments # TODO: clean self.epath = subing.IoSetSuber(db=self, subkey="epath.") self.essrs = subing.CesrIoSetSuber(db=self, subkey="essrs.", klas=coring.Texter) # accepted signed 12-word challenge response exn messages keys by prefix of signer # TODO: clean self.chas = subing.CesrIoSetSuber(db=self, subkey='chas.', klas=coring.Diger) # successfull signed 12-word challenge response exn messages keys by prefix of signer # TODO: clean self.reps = subing.CesrIoSetSuber(db=self, subkey='reps.', klas=coring.Diger) # authorzied well known OOBIs # TODO: clean self.wkas = koming.IoSetKomer(db=self, subkey='wkas.', klas=WellKnownAuthN) # KSN support datetime stamps and signatures indexed and not-indexed # all ksn kdts (key state datetime serializations) maps said to date-time # TODO: clean self.kdts = subing.CesrSuber(db=self, subkey='kdts.', klas=coring.Dater) # all key state messages. Maps key state said to serialization. ksns are # KeyStateRecords so use ._asdict or ._asjson as appropriate # use .kdts, .ksgs, and .kcgs for datetimes and signatures # TODO: clean self.ksns = koming.Komer(db=self, klas=KeyStateRecord, subkey='ksns.') # key state SAID database for successfully saved key state notices # maps key=(prefix, aid) to val=said of key state # TODO: clean self.knas = subing.CesrSuber(db=self, subkey='knas.', klas=coring.Diger) # Watcher watched SAID database for successfully saved watched AIDs for a watcher # maps key=(cid, aid, oid) to val=said of rpy message # TODO: clean self.wwas = subing.CesrSuber(db=self, subkey='wwas.', klas=coring.Diger) # config loaded oobis to be processed asynchronously, keyed by oobi URL # TODO: clean self.oobis = koming.Komer(db=self, subkey='oobis.', klas=OobiRecord, sep=">") # Use seperator not allowed in URLs so no splitting occurs. # escrow OOBIs that failed to load, retriable, keyed by oobi URL self.eoobi = koming.Komer(db=self, subkey='eoobi.', klas=OobiRecord, sep=">") # Use seperator not allowed in URLs so no splitting occurs. # OOBIs with outstand client requests. self.coobi = koming.Komer(db=self, subkey='coobi.', klas=OobiRecord, sep=">") # Use seperator not allowed in URLs so no splitting occurs. # Resolved OOBIs (those that have been processed successfully for this database. # TODO: clean self.roobi = koming.Komer(db=self, subkey='roobi.', klas=OobiRecord, sep=">") # Use seperator not allowed in URLs so no splitting occurs. # Well known OOBIs that are to be used for mfa against a resolved OOBI. # TODO: clean self.woobi = koming.Komer(db=self, subkey='woobi.', klas=OobiRecord, sep=">") # Use seperator not allowed in URLs so no splitting occurs. # Well known OOBIs that are to be used for mfa against a resolved OOBI. # TODO: clean self.moobi = koming.Komer(db=self, subkey='moobi.', klas=OobiRecord, sep=">") # Use seperator not allowed in URLs so no splitting occurs. # Multifactor well known OOBI auth records to process. Keys by controller URL # TODO: clean self.mfa = koming.Komer(db=self, subkey='mfa.', klas=OobiRecord, sep=">") # Use seperator not allowed in URLs so no splitting occurs. # Resolved multifactor well known OOBI auth records. Keys by controller URL # TODO: clean self.rmfa = koming.Komer(db=self, subkey='rmfa.', klas=OobiRecord, sep=">") # Use seperator not allowed in URLs so no splitting occurs. # JSON schema SADs keys by the SAID # TODO: clean self.schema = subing.SchemerSuber(db=self, subkey='schema.') # Field values for contact information for remote identifiers. Keyed by prefix/field # TODO: clean self.cfld = subing.Suber(db=self, subkey="cfld.") # Global settings for the Habery environment self.hbys = subing.Suber(db=self, subkey='hbys.') # Signed contact data, keys by prefix # TODO: clean self.cons = subing.Suber(db=self, subkey="cons.") # Transferable signatures on contact data # TODO: clean self.ccigs = subing.CesrSuber(db=self, subkey='ccigs.', klas=coring.Cigar) # Blinded media for contact information for remote identifiers. # CatCesrSuber with TypeMedia format: (Noncer=SAID, Noncer=UUID, Labeler=MIME, Texter=data) self.imgs = subing.CatCesrSuber(db=self, subkey='imgs.', klas=(coring.Noncer, coring.Noncer, coring.Labeler, coring.Texter)) # Field values for identifier information for local identifiers. Keyed by prefix/field # TODO: clean self.ifld = subing.Suber(db=self, subkey="ifld.") # Signed identifier data, keys by prefix # TODO: clean self.sids = subing.Suber(db=self, subkey="sids.") # Transferable signatures on identifier data # TODO: clean self.icigs = subing.CesrSuber(db=self, subkey='icigs.', klas=coring.Cigar) # Blinded media for identifier information for local identifiers. # CatCesrSuber with TypeMedia format: (Noncer=SAID, Noncer=UUID, Labeler=MIME, Texter=data) self.iimgs = subing.CatCesrSuber(db=self, subkey='iimgs.', klas=(coring.Noncer, coring.Noncer, coring.Labeler, coring.Texter)) # Delegation escrow dbs # # delegated partial witness escrow self.dpwe = subing.SerderSuber(db=self, subkey='dpwe.') # delegated unanchored escrow self.dune = subing.SerderSuber(db=self, subkey='dune.') # delegate publication escrow for sending delegator info to my witnesses self.dpub = subing.SerderSuber(db=self, subkey='dpub.') # completed group delegated AIDs # TODO: clean self.cdel = subing.CesrOnSuber(db=self, subkey='cdel.', klas=coring.Diger) # multisig sig embed payload SAID mapped to containing exn messages across group multisig participants # TODO: clean self.meids = subing.CesrIoSetSuber(db=self, subkey="meids.", klas=coring.Diger) # multisig sig embed payload SAID mapped to group multisig participants AIDs # TODO: clean self.maids = subing.CesrIoSetSuber(db=self, subkey="maids.", klas=coring.Prefixer) # KRAM cache type — key: expression string, value: drift and lag params self.kramCTYP = koming.Komer(db=self, subkey='ctyp.', klas=CacheTypeRecord) # KRAM message cache — key: (AID, MID), value: msg datetime, drift, lags self.kramMSGC = koming.Komer(db=self, subkey='msgc.', klas=MsgCacheRecord) # KRAM transactioned message cache — key: (AID, XID, MID), value: datetimes, drift, lags self.kramTMSC = koming.Komer(db=self, subkey='tmsc.', klas=TxnMsgCacheRecord) # KRAM partially signed multi-key message key (AID.MID) mapped to associated message (SerderKERI) self.kramPMKM = subing.SerderSuber(db=self, subkey='pmkm.') # KRAM partially signed multi-key signature key (AID.MID) mapped to associated signatures self.kramPMKS = subing.CesrIoSetSuber(db=self, subkey='pmks.', klas=indexing.Siger) # KRAM partially signed multi-key sender key state key (AID.MID) mapped to SN and event SAID self.kramPMSK = subing.CatCesrSuber(db=self, subkey='pmsk.', klas=(coring.Number, coring.Diger)) # KRAM partially signed multi-key non-authenticator attachments # trqs: trans receipt quadruples (prefixer, number, diger, siger) self.kramTRQS = subing.CatCesrIoSetSuber(db=self, subkey='trqs.', klas=(coring.Prefixer, coring.Number, coring.Diger, indexing.Siger)) # tsgs: trans last sig groups (prefixer, number, diger, siger) — stored per-siger self.kramTSGS = subing.CatCesrIoSetSuber(db=self, subkey='tsgs.', klas=(coring.Prefixer, coring.Number, coring.Diger, indexing.Siger)) # sscs: first seen seal couples (number, diger) issuing or delegating self.kramSSCS = subing.CatCesrIoSetSuber(db=self, subkey='sscs.', klas=(coring.Number, coring.Diger)) # ssts: source seal triples (prefixer, number, diger) issued or delegated self.kramSSTS = subing.CatCesrIoSetSuber(db=self, subkey='ssts.', klas=(coring.Prefixer, coring.Number, coring.Diger)) # frcs: first seen replay couples (number, dater) self.kramFRCS = subing.CatCesrIoSetSuber(db=self, subkey='frcs.', klas=(coring.Number, coring.Dater)) # tdcs: typed digest seal couples (verser, diger) self.kramTDCS = subing.CatCesrIoSetSuber(db=self, subkey='tdcs.', klas=(coring.Verser, coring.Diger)) # ptds: pathed streams (raw bytes) self.kramPTDS = subing.IoSetSuber(db=self, subkey='ptds.') # bsqs: blind state quadruples (diger, noncer, noncer, labeler) self.kramBSQS = subing.CatCesrIoSetSuber(db=self, subkey='bsqs.', klas=(coring.Diger, coring.Noncer, coring.Noncer, coring.Labeler)) # bsss: bound state sextuples (diger, noncer, noncer, labeler, number, noncer) self.kramBSSS = subing.CatCesrIoSetSuber(db=self, subkey='bsss.', klas=(coring.Diger, coring.Noncer, coring.Noncer, coring.Labeler, coring.Number, coring.Noncer)) # tmqs: type media quadruples (diger, noncer, labeler, texter) self.kramTMQS = subing.CatCesrIoSetSuber(db=self, subkey='tmqs.', klas=(coring.Diger, coring.Noncer, coring.Labeler, coring.Texter)) self.reload() return self.env
[docs] def reload(self): """ Reload stored prefixes and Kevers from .habs """ # Check migrations to see if this database is up to date. Error otherwise if not self.current: raise DatabaseError(f"Database migrations must be run. DB version {self.version}; current {__version__}") removes = [] for keys, data in self.habs.getTopItemIter(): if (ksr := self.states.get(keys=data.hid)) is not None: try: from ..core.eventing import Kever kever = Kever(state=ksr, db=self, local=True) except MissingEntryError as ex: # no kel event for keystate removes.append(keys) # remove from .habs continue self.kevers[kever.prefixer.qb64] = kever self.prefixes.add(kever.prefixer.qb64) if data.mid: # group hab self.groups.add(data.hid) elif data.mid is None: # in .habs but no corresponding key state and not a group so remove removes.append(keys) # no key state or KEL event for .hab record for keys in removes: # remove bare .habs records self.habs.rem(keys=keys)
[docs] def migrate(self): """ Run all migrations required Run all migrations that are required from the current version of database up to the current version of the software that have not already been run. Sets the version of the database to the current version of the software after successful completion of required migrations """ from ..core import coring escrows_cleared = False for (version, migrations) in MIGRATIONS: # Only run migration if current source code version is at or below the migration version ver = semver.VersionInfo.parse(__version__) ver_no_prerelease = semver.Version(ver.major, ver.minor, ver.patch) if self.version is not None and semver.compare(version, str(ver_no_prerelease)) > 0: print( f"Skipping migration {version} as higher than the current KERI version {__version__}") continue # Skip migrations already run - where version less than (-1) or equal to (0) database version # Strip prerelease from DB version to avoid lexicographic comparison bugs (#820) if self.version is not None and semver.compare(version, _strip_prerelease(self.version)) != 1: continue # Clear all escrows before first migration to prevent old key # format crashes (e.g. qnfs keys without insertion-order suffix). # Uses .trim() which bypasses key parsing. See #863. if not escrows_cleared: self._trimAllEscrows() escrows_cleared = True print(f"Migrating database v{self.version} --> v{version}") for migration in migrations: modName = f"keri.db.migrations.{migration}" if self.migs.get(keys=(migration,)) is not None: continue mod = importlib.import_module(modName) try: print(f"running migration {modName}") mod.migrate(self) except Exception as e: print(f"\nAbandoning migration {migration} at version {version} with error: {e}") return self.migs.pin(keys=(migration,), val=coring.Dater()) # update database version after successful migration self.version = version self.version = __version__
def _trimAllEscrows(self): """Trim all escrow databases via low-level .trim(). Safe for old key formats that would crash higher-level iterators (e.g., qnfs keys without insertion-order suffix from pre-1.2.0). Called at the beginning of migration per spec call guidance. See: https://github.com/WebOfTrust/keripy/issues/863 """ escrows = [ self.ures, self.vres, self.pses, self.pwes, self.ooes, self.qnfs, self.uwes, self.misfits, self.delegables, self.pdes, self.udes, self.rpes, self.ldes, self.epsd, self.eoobi, self.dpub, self.gpwe, self.gdee, self.dpwe, self.gpse, self.epse, self.dune, ] total = 0 for escrow in escrows: count = escrow.cnt() if count > 0: escrow.trim() total += count if total > 0: print(f"Cleared {total} escrow entries before migration")
[docs] def clearEscrows(self): """ Clear all escrows """ for escrow in [self.ures, self.vres, self.pses, self.pwes, self.ooes, self.qnfs, self.uwes, self.qnfs, self.misfits, self.delegables, self.pdes, self.udes, self.rpes, self.ldes, self.epsd, self.eoobi, self.dpub, self.gpwe, self.gdee, self.dpwe, self.gpse, self.epse, self.dune]: count = escrow.cntAll() escrow.trim() logger.info(f"KEL: Cleared {count} escrows from ({escrow}")
@property def current(self): """ Current property determines if we are at the current database migration state. If the database version matches the library version return True If the current database version is behind the current library version, check for migrations - If there are migrations to run, return False - If there are no migrations to run, reset database version to library version and return True If the current database version is ahead of the current library version, raise exception """ if self.version == __version__: return True ver = semver.VersionInfo.parse(__version__) ver_no_prerelease = semver.Version(ver.major, ver.minor, ver.patch) # Strip prerelease from DB version to avoid lexicographic comparison bugs (#820) if self.version is not None and semver.compare(_strip_prerelease(self.version), str(ver_no_prerelease)) == 1: raise ConfigurationError( f"Database version={self.version} is ahead of library version={__version__}") last = MIGRATIONS[-1] # If we aren't at latest version, but there are no outstanding migrations, # reset version to latest (rightmost (-1) migration is latest) if self.migs.get(keys=(last[1][-1],)) is not None: return True # We have migrations to run return False
[docs] def complete(self, name=None): """ Returns list of tuples of migrations completed with date of completion Parameters: name(str): optional name of migration to check completeness Returns: list: tuples of migration,date of completed migration names and the date of completion """ migrations = [] if not name: for version, migs in MIGRATIONS: # Print entries only for migrations that have been run # Strip prerelease from DB version to avoid lexicographic comparison bugs (#820) if self.version is not None and semver.compare(version, _strip_prerelease(self.version)) <= 0: for mig in migs: dater = self.migs.get(keys=(mig,)) migrations.append((mig, dater)) else: for version, migs in MIGRATIONS: # check all migrations for each version if name not in migs or not self.migs.get(keys=(name,)): raise ValueError(f"No migration named {name}") migrations.append((name, self.migs.get(keys=(name,)))) return migrations
[docs] def clean(self): """ Clean database by creating re-verified cleaned cloned copy and then replacing original with cleaned cloned copy Database usage should be offline during cleaning as it will be cloned in readonly mode """ from ..core import parsing # create copy to clone into with openDB(name=self.name, temp=False, headDirPath=self.headDirPath, perm=self.perm, clean=True) as copy: # copy is Baser instance with reopenDB(db=self, reuse=True, readonly=True): # reopen as readonly if not os.path.exists(self.path): raise ValueError("Error while cleaning, no orig at {}." "".format(self.path)) from ..core.eventing import Kevery kvy = Kevery(db=copy) # promiscuous mode # Revise in future to NOT parse msgs but to extract the processed # objects so can pass directly to kvy.processEvent() # need new method cloneObjAllPreIter() # process event doesn't capture exceptions so we can more easily # detect in the cloning that some events did not make it through psr = parsing.Parser(kvy=kvy, version=Vrsn_1_0) for msg in self.cloneAllPreIter(): # clone into copy psr.parseOne(ims=msg) # This is the list of non-set based databases that are not created as part of event processing. # for now we are just copying them from self to copy without worrying about being able to # reprocess them. We need a more secure method in the future unsecured = ["hbys", "schema", "states", "rpys", "eans", "tops", "cgms", "exns", "erpy", "kdts", "ksns", "knas", "oobis", "roobi", "woobi", "moobi", "mfa", "rmfa", "cfld", "cons", "ccigs", "cdel", "migs", "ifld", "sids", "icigs"] for name in unsecured: srcdb = getattr(self, name) cpydb = getattr(copy, name) for keys, val in srcdb.getTopItemIter(): cpydb.put(keys=keys, val=val) # This is the list of set based databases that are not created as part of event processing. # for now we are just copying them from self to copy without worrying about being able to # reprocess them. We need a more secure method in the future sets = ["esigs", "ecigs", "epath", "chas", "reps", "wkas", "meids", "maids"] for name in sets: srcdb = getattr(self, name) cpydb = getattr(copy, name) for keys, val in srcdb.getTopItemIter(): cpydb.add(keys=keys, val=val) # Copy imgs (blinded media for remote identifiers) for keys, val in self.imgs.getTopItemIter(): copy.imgs.pin(keys=keys, val=val) # Copy iimgs (blinded media for local identifiers) for keys, val in self.iimgs.getTopItemIter(): copy.iimgs.pin(keys=keys, val=val) # clone .habs habitat name prefix Komer subdb # copy.habs = koming.Komer(db=copy, schema=HabitatRecord, subkey='habs.') # copy for keys, val in self.habs.getTopItemIter(): if val.hid in copy.kevers: # only copy habs that verified copy.habs.put(keys=keys, val=val) ns = "" if val.domain is None else val.domain copy.names.put(keys=(ns, val.name), val=val.hid) copy.prefixes.add(val.hid) if val.mid: # a group hab copy.groups.add(val.hid) # clone .ends and .locs databases for (cid, role, eid), val in self.ends.getTopItemIter(): exists = False # only copy if entries in both .ends and .locs for scheme in ("https", "http", "tcp"): # all supported schemes lval = self.locs.get(keys=(eid, scheme)) if lval: exists = True # loc with matching cid and rol copy.locs.put(keys=(eid, scheme), val=lval) if exists: # only copy end if has at least one matching loc copy.ends.put(keys=(cid, role, eid), val=val) # replace own kevers with copy kevers by clear and copy # future do this by loading kever from .stts key state subdb self.kevers.clear() for pre, kever in copy.kevers.items(): self.kevers[pre] = kever # replace prefixes with cloned copy prefixes # clear and clone .prefixes self.prefixes.clear() self.prefixes.update(copy.prefixes) # clear and clone .gids self.groups.clear() self.groups.update(copy.groups) # remove own db directory replace with clean clone copy if os.path.exists(self.path): shutil.rmtree(self.path) dst = shutil.move(copy.path, self.path) # move copy back to orig if os.path.exists(os.path.join(os.path.sep, "usr", "local", "var", "keri", "clean")): shutil.rmtree(os.path.join(os.path.sep, "usr", "local", "var", "keri", "clean")) if not dst: # move failed leave new in place so can manually fix raise ValueError("Error cloning, unable to move {} to {}." "".format(copy.path, self.path)) with reopenDB(db=self, reuse=True): # make sure can reopen if not isinstance(self.env, lmdb.Environment): raise ValueError("Error cloning, unable to reopen." "".format(self.path)) # clone success so remove if still there if os.path.exists(copy.path): shutil.rmtree(copy.path)
[docs] def clonePreIter(self, pre, fn=0): """ Returns iterator of first seen event messages with attachments for the identifier prefix pre starting at first seen order number, fn. Essentially a replay in first seen order with attachments Parameters: pre is bytes of itdentifier prefix fn is int fn to resume replay. Earliset is fn=0 Returns: msgs (Iterator): over all items with pre starting at fn """ if hasattr(pre, 'encode'): pre = pre.encode("utf-8") for keys, fn, dig in self.fels.getAllItemIter(keys=pre, on=fn): try: msg = self.cloneEvtMsg(pre=pre, fn=fn, dig=dig) except Exception: continue # skip this event yield msg
[docs] def cloneAllPreIter(self): """ Returns iterator of first seen event messages with attachments for all identifier prefixes starting at key. If key == b'' then start at first key in databse. Use key to resume replay. Essentially a replay in first seen order with attachments of entire set of FELs. Returns: msgs (Iterator): over all items in db """ for keys, fn, dig in self.fels.getAllItemIter(keys=b'', on=0): pre = keys[0].encode() if isinstance(keys[0], str) else keys[0] try: msg = self.cloneEvtMsg(pre=pre, fn=fn, dig=dig) except Exception: continue # skip this event yield msg
[docs] def cloneEvtMsg(self, pre, fn, dig): """ Clones Event as Serialized CESR Message with Body and attached Foot Parameters: pre (bytes): identifier prefix of event fn (int): first seen number (ordinal) of event dig (bytes): digest of event Returns: bytearray: message body with attachments """ from ..core import coring from ..core.counting import Counter, Codens msg = bytearray() # message atc = bytearray() # attachments dgkey = dgKey(pre, dig) # get message if not (serder := self.evts.get(keys=(pre, dig))): raise MissingEntryError("Missing event for dig={}.".format(dig)) msg.extend(serder.raw) # add indexed signatures to attachments if not (sigers := self.sigs.get(keys=dgkey)): raise MissingEntryError("Missing sigs for dig={}.".format(dig)) atc.extend(Counter(code=Codens.ControllerIdxSigs, count=len(sigers), version=Vrsn_1_0).qb64b) for siger in sigers: atc.extend(siger.qb64b) # add indexed witness signatures to attachments if wigers := self.wigs.get(keys=dgkey): atc.extend(Counter(code=Codens.WitnessIdxSigs, count=len(wigers), version=Vrsn_1_0).qb64b) for wiger in wigers: atc.extend(wiger.qb64b) # add authorizer (delegator/issuer) source seal event couple to attachments if (duple := self.aess.get(keys=(pre, dig))) is not None: number, diger = duple atc.extend(Counter(code=Codens.SealSourceCouples, count=1, version=Vrsn_1_0).qb64b) atc.extend(number.qb64b + diger.qb64b) # add trans endorsement quadruples to attachments not controller # may have been originally key event attachments or receipted endorsements if quads := self.vrcs.get(keys=dgkey): atc.extend(Counter(code=Codens.TransReceiptQuadruples, count=len(quads), version=Vrsn_1_0).qb64b) for pre, snu, diger, siger in quads: # adapt to CESR atc.extend(pre.qb64b) atc.extend(snu.qb64b) atc.extend(diger.qb64b) atc.extend(siger.qb64b) # add nontrans endorsement couples to attachments not witnesses # may have been originally key event attachments or receipted endorsements if coups := self.rcts.get(keys=dgkey): atc.extend(Counter(code=Codens.NonTransReceiptCouples, count=len(coups), version=Vrsn_1_0).qb64b) for prefixer, cigar in coups: atc.extend(prefixer.qb64b) atc.extend(cigar.qb64b) # add first seen replay couple to attachments if not (dater := self.dtss.get(keys=dgkey)): raise MissingEntryError("Missing datetime for dig={}.".format(dig)) atc.extend(Counter(code=Codens.FirstSeenReplayCouples, count=1, version=Vrsn_1_0).qb64b) atc.extend(coring.Number(num=fn, code=coring.NumDex.Huge).qb64b) # may not need to be Huge atc.extend(dater.qb64b) # prepend pipelining counter to attachments if len(atc) % 4: raise ValueError("Invalid attachments size={}, nonintegral" " quadlets.".format(len(atc))) pcnt = Counter(code=Codens.AttachmentGroup, count=(len(atc) // 4), version=Vrsn_1_0).qb64b msg.extend(pcnt) msg.extend(atc) return msg
[docs] def cloneDelegation(self, kever): """ Recursively clone delegation chain from AID of Kever if one exits. Parameters: kever (Kever): Kever from which to clone the delegator's AID. """ if kever.delegated and kever.delpre in self.kevers: dkever = self.kevers[kever.delpre] yield from self.cloneDelegation(dkever) for dmsg in self.clonePreIter(pre=kever.delpre, fn=0): yield dmsg
[docs] def fetchAllSealingEventByEventSeal(self, pre, seal, sn=0): """ Search through a KEL for the event that contains a specific anchored SealEvent type of provided seal but in dict form and is also fully witnessed. Searchs from sn forward (default = 0).Searches all events in KEL of pre including disputed and/or superseded events. Returns the Serder of the first event with the anchored SealEvent seal, None if not found Parameters: pre (bytes|str): identifier of the KEL to search seal (dict): dict form of Seal of any type SealEvent to find in anchored seals list of each event sn (int): beginning sn to search """ from ..core.structing import SealEvent if tuple(seal) != SealEvent._fields: # wrong type of seal return None seal = SealEvent(**seal) #convert to namedtuple for srdr in self.getEvtPreIter(pre=pre, sn=sn): # includes disputed & superseded for eseal in srdr.seals or []: # or [] for seals 'a' field missing if tuple(eseal) == SealEvent._fields: eseal = SealEvent(**eseal) # convert to namedtuple if seal == eseal and self.fullyWitnessed(srdr): return srdr return None
# use alias here until can change everywhere for backwards compatibility findAnchoringSealEvent = fetchAllSealingEventByEventSeal # alias
[docs] def fetchLastSealingEventByEventSeal(self, pre, seal, sn=0): """ Search through a KEL for the last event at any sn but that contains a specific anchored event seal of namedtuple SealEvent type that matches the provided seal in dict form and is also fully witnessed. Searchs from provided sn forward (default = 0). Searches only last events in KEL of pre so does not include disputed and/or superseded events. Returns: srdr (Serder): instance of the first event with the matching anchoring SealEvent seal, None if not found Parameters: pre (bytes|str): identifier of the KEL to search seal (dict): dict form of Seal of any type SealEvent to find in anchored seals list of each event sn (int): beginning sn to search """ from ..core.structing import SealEvent if tuple(seal) != SealEvent._fields: # wrong type of seal return None seal = SealEvent(**seal) #convert to namedtuple for srdr in self.getEvtLastPreIter(pre=pre, sn=sn): # no disputed or superseded for eseal in srdr.seals or []: # or [] for seals 'a' field missing if tuple(eseal) == SealEvent._fields: eseal = SealEvent(**eseal) # convert to namedtuple if seal == eseal and self.fullyWitnessed(srdr): return srdr return None
[docs] def fetchLastSealingEventBySeal(self, pre, seal, sn=0): """Only searches last event at any sn therefore does not search any disputed or superseded events. Search through last event at each sn in KEL for the event that contains an anchored Seal with same Seal type as provided seal but in dict form. Searchs from sn forward (default = 0). Returns the Serder of the first found event with the anchored Seal seal, None if not found Parameters: pre (bytes|str): identifier of the KEL to search seal (dict): dict form of Seal of any type to find in anchored seals list of each event sn (int): beginning sn to search """ # create generic Seal namedtuple class using keys from provided seal dict Seal = namedtuple('Seal', list(seal)) # matching type for srdr in self.getEvtLastPreIter(pre=pre, sn=sn): # only last evt at sn for eseal in srdr.seals or []: # or [] for seals 'a' field missing if tuple(eseal) == Seal._fields: # same type of seal eseal = Seal(**eseal) #convert to namedtuple if seal == eseal and self.fullyWitnessed(srdr): return srdr return None
[docs] def signingMembers(self, pre: str): """ Find signing members of a multisig group aid. Using the pubs index to find members of a signing group Parameters: pre (str): qb64 identifier prefix to find members Returns: list: qb64 identifier prefixes of signing members for provided aid """ if (habord := self.habs.get(keys=(pre,))) is None: return None return habord.smids
[docs] def rotationMembers(self, pre: str): """ Find rotation members of a multisig group aid. Using the digs index to lookup member pres of a group aid Parameters: pre (str): qb64 identifier prefix to find members Returns: list: qb64 identifier prefixes of rotation members for provided aid """ if (habord := self.habs.get(keys=(pre,))) is None: return None return habord.rmids
[docs] def fullyWitnessed(self, serder): """ Verify the witness threshold on the event Parameters: serder (Serder): event serder to validate witness threshold Returns: """ # Verify fully receipted, because this witness may have persisted before all receipts # have been gathered if this ius a witness for serder.pre # get unique verified wigers and windices lists from wigers list wigers = self.wigs.get(keys=(serder.preb, serder.saidb)) kever = self.kevers[serder.pre] toad = kever.toader.num return not len(wigers) < toad
[docs] def resolveVerifiers(self, pre=None, sn=0, dig=None): """ Returns the Tholder and Verfers for the provided identifier prefix. Default pre is own .pre Parameters: pre(str) is qb64 str of bytes of identifier prefix. sn(int) is the sequence number of the est event dig(str) is qb64 str of digest of est event """ from ..core import coring prefixer = coring.Prefixer(qb64=pre) if prefixer.transferable: # receipted event and receipter in database so get receipter est evt # retrieve dig of last event at sn of est evt of receipter. sdig = self.kels.getLast(keys=prefixer.qb64b, on=sn) if sdig is None: # receipter's est event not yet in receipters's KEL raise ValidationError("key event sn {} for pre {} is not yet in KEL" "".format(sn, pre)) sdig = sdig.encode("utf-8") # retrieve last event itself of receipter est evt from sdig sserder = self.evts.get(keys=(prefixer.qb64b, bytes(sdig))) # assumes db ensures that sserder must not be none because sdig was in KE if dig is not None and not sserder.compare(said=dig): # endorser's dig not match event raise ValidationError("Bad proof sig group at sn = {}" " for ksn = {}." "".format(sn, sserder.sad)) verfers = sserder.verfers tholder = sserder.tholder else: verfers = [coring.Verfer(qb64=pre)] tholder = coring.Tholder(sith="1") return tholder, verfers
[docs] def getEvtPreIter(self, pre, sn=0): """ Returns iterator of event messages without attachments in sn order from the KEL of identifier prefix pre. Essentially a replay of all event messages without attachments for each sn from the KEL of pre including superseded duplicates Parameters: pre (bytes|str): identifier prefix sn (int): sequence number (default 0) to begin interation """ if hasattr(pre, 'encode'): pre = pre.encode("utf-8") for dig in self.kels.getAllIter(keys=pre, on=sn): try: if not (serder := self.evts.get(keys=(pre, dig))): raise MissingEntryError("Missing event for dig={}.".format(dig)) except Exception: continue # skip this event yield serder # event as Serder
[docs] def getEvtLastPreIter(self, pre, sn=0): """ Returns iterator of event messages without attachments in sn order from the KEL of identifier prefix pre. Essentially a replay of all event messages without attachments for each sn from the KEL of pre including superseded duplicates Parameters: pre (bytes|str): identifier prefix sn (int): sequence number (default 0) to begin interation """ if hasattr(pre, 'encode'): pre = pre.encode("utf-8") for dig in self.kels.getLastIter(keys=pre, on=sn): try: if not (serder := self.evts.get(keys=(pre, dig) )): raise MissingEntryError("Missing event for dig={}.".format(dig)) except Exception: continue # skip this event yield serder # event as Serder
[docs] class BaserDoer(doing.Doer): """ Basic Baser Doer ( LMDB Database ) Attributes: (inherited) done (bool): completion state: True means completed Otherwise incomplete. Incompletion maybe due to close or abort. Attributes: .baser is Baser or LMDBer subclass Properties: (inherited) .tyme is float relative cycle time of associated Tymist .tyme obtained via injected .tymth function wrapper closure. .tymth is function wrapper closure returned by Tymist .tymeth() method. When .tymth is called it returns associated Tymist .tyme. .tymth provides injected dependency on Tymist tyme base. .tock is float, desired time in seconds between runs or until next run, non negative, zero means run asap Properties: Methods: .wind injects ._tymth dependency from associated Tymist to get its .tyme .__call__ makes instance callable Appears as generator function that returns generator .do is generator method that returns generator .enter is enter context action method .recur is recur context action method or generator method .exit is exit context method .close is close context method .abort is abort context method Hidden: ._tymth is injected function wrapper closure returned by .tymen() of associated Tymist instance that returns Tymist .tyme. when called. ._tock is hidden attribute for .tock property """
[docs] def __init__(self, baser, **kwa): """ Inherited Parameters: tymist is Tymist instance tock is float seconds initial value of .tock Parameters: baser is Baser instance """ super(BaserDoer, self).__init__(**kwa) self.baser = baser
def enter(self, *, temp=None): """""" if not self.baser.opened: self.baser.reopen() # reopen(temp=temp) def exit(self): """""" self.baser.close(clear=self.baser.temp)