Source code for keri.db.koming

# -*- encoding: utf-8 -*-
"""
KERI
keri.db.koming module

"""
import types
import json
from dataclasses import dataclass
from typing import Type, Union
from collections.abc import Iterable

import cbor2
import msgpack
import lmdb


from . import dbing
from .. import help
from ..core import coring
from ..help import helping

logger = help.ogler.getLogger()



[docs] class KomerBase: """ KomerBase is a base class for Komer (Keyspace Object Mapper) subclasses that each use a dataclass as the object mapped via serialization to an dber LMDB database subclass. Each Komer .schema is a dataclass class reference that is used to define the fields in each database entry. The base class is not meant to be instantiated. Use an instance of one of the subclasses instead. Attributes: db (dbing.LMDBer): instance of LMDB database manager class sdb (lmdb._Database): instance of named sub db lmdb for this Komer schema (Type[dataclass]): class reference of dataclass subclass kind (str): serialization/deserialization type from coring.Serials serializer (types.MethodType): serializer method deserializer (types.MethodType): deserializer method sep (str): separator for combining keys tuple of strs into key bytes """ Sep = '.' # separator for combining key iterables
[docs] def __init__(self, db: dbing.LMDBer, *, subkey: str = 'docs.', schema: Type[dataclass], # class not instance kind: str = coring.Serials.json, dupsort: bool = False, sep: str = None, **kwa): """ Parameters: db (dbing.LMDBer): base db schema (Type[dataclass]): reference to Class definition for dataclass sub class subkey (str): LMDB sub database key kind (str): serialization/deserialization type dupsort (bool): True means enable duplicates at each key False (default) means do not enable duplicates at each key sep (str): separator to convert keys iterator to key bytes for db key default is self.Sep == '.' """ super(KomerBase, self).__init__() self.db = db self.sdb = self.db.env.open_db(key=subkey.encode("utf-8"), dupsort=dupsort) self.schema = schema self.kind = kind self.serializer = self._serializer(kind) self.deserializer = self._deserializer(kind) self.sep = sep if sep is not None else self.Sep
def _tokey(self, keys: Union[str, bytes, memoryview, Iterable]): """ Converts key to key str with proper separators and returns key bytes. If key is already str then returns. Else If key is iterable (non-str) of strs then joins with separator converts to bytes and returns Parameters: keys (Union[str, bytes, Iterable]): str, bytes, or Iterable of str. """ if isinstance(keys, memoryview): # memoryview of bytes return bytes(keys) # return bytes if hasattr(keys, "encode"): # str return keys.encode("utf-8") # convert to bytes elif hasattr(keys, "decode"): # bytes return keys # return as is return (self.sep.join(keys).encode("utf-8")) # iterable so join def _tokeys(self, key: Union[str, bytes, memoryview]): """ Converts key bytes to keys tuple of strs by decoding and then splitting at separator. Returns: keys (iterable): of str Parameters: key (Union[str, bytes]): str or bytes. """ if isinstance(key, memoryview): # memoryview of bytes key = bytes(key) return tuple(key.decode("utf-8").split(self.sep))
[docs] def getItemIter(self, keys: Union[str, Iterable]=b""): """ Returns: items (Iterator): of (key, val) tuples over the all the items in subdb whose key startswith key made from keys. Keys may be keyspace prefix to return branches of key space. When keys is empty then returns all items in subdb Parameters: keys (Iterator): tuple of bytes or strs that may be a truncation of a full keys tuple in in order to get all the items from multiple branches of the key space. If keys is empty then gets all items in database. """ for key, val in self.db.getTopItemIter(db=self.sdb, key=self._tokey(keys)): yield (self._tokeys(key), self.deserializer(val))
def _serializer(self, kind): """ Parameters: kind (str): serialization """ if kind == coring.Serials.mgpk: return self.__serializeMGPK elif kind == coring.Serials.cbor: return self.__serializeCBOR else: return self.__serializeJSON def _deserializer(self, kind): """ Parameters: kind (str): deserialization """ if kind == coring.Serials.mgpk: return self.__deserializeMGPK elif kind == coring.Serials.cbor: return self.__deserializeCBOR else: return self.__deserializeJSON def __deserializeJSON(self, val): if val is not None: val = helping.datify(self.schema, json.loads(bytes(val).decode("utf-8"))) if not isinstance(val, self.schema): raise ValueError("Invalid schema type={} of value={}, expected {}." "".format(type(val), val, self.schema)) return val def __deserializeMGPK(self, val): if val is not None: val = helping.datify(self.schema, msgpack.loads(bytes(val))) if not isinstance(val, self.schema): raise ValueError("Invalid schema type={} of value={}, expected {}." "".format(type(val), val, self.schema)) return val def __deserializeCBOR(self, val): if val is not None: val = helping.datify(self.schema, cbor2.loads(bytes(val))) if not isinstance(val, self.schema): raise ValueError("Invalid schema type={} of value={}, expected {}." "".format(type(val), val, self.schema)) return val def __serializeJSON(self, val): if val is not None: if not isinstance(val, self.schema): raise ValueError("Invalid schema type={} of value={}, expected {}." "".format(type(val), val, self.schema)) val = json.dumps(helping.dictify(val), separators=(",", ":"), ensure_ascii=False).encode("utf-8") return val def __serializeMGPK(self, val): if val is not None: if not isinstance(val, self.schema): raise ValueError("Invalid schema type={} of value={}, expected {}." "".format(type(val), val, self.schema)) val = msgpack.dumps(helping.dictify(val)) return val def __serializeCBOR(self, val): if val is not None: if not isinstance(val, self.schema): raise ValueError("Invalid schema type={} of value={}, expected {}." "".format(type(val), val, self.schema)) val = cbor2.dumps(helping.dictify(val)) return val
[docs] class Komer(KomerBase): """ Keyspace Object Mapper factory class. """
[docs] def __init__(self, db: dbing.LMDBer, *, subkey: str = 'docs.', schema: Type[dataclass], # class not instance kind: str = coring.Serials.json, **kwa): """ Parameters: db (dbing.LMDBer): base db schema (Type[dataclass]): reference to Class definition for dataclass sub class subkey (str): LMDB sub database key kind (str): serialization/deserialization type """ super(Komer, self).__init__(db=db, subkey=subkey, schema=schema, kind=kind, dupsort=False, **kwa)
[docs] def put(self, keys: Union[str, Iterable], val: dataclass): """ Puts val at key made from keys. Does not overwrite Parameters: keys (tuple): of key strs to be combined in order to form key val (dataclass): instance of dataclass of type self.schema as value Returns: result (bool): True If successful, False otherwise, such as key already in database. """ return (self.db.putVal(db=self.sdb, key=self._tokey(keys), val=self.serializer(val)))
[docs] def pin(self, keys: Union[str, Iterable], val: dataclass): """ Pins (sets) val at key made from keys. Overwrites. Parameters: keys (tuple): of key strs to be combined in order to form key val (dataclass): instance of dataclass of type self.schema as value Returns: result (bool): True If successful. False otherwise. """ return (self.db.setVal(db=self.sdb, key=self._tokey(keys), val=self.serializer(val)))
[docs] def get(self, keys: Union[str, Iterable]): """ Gets val at keys Parameters: keys (tuple): of key strs to be combined in order to form key Returns: val (dataclass): None if no entry at keys Usage: Use walrus operator to catch and raise missing entry if (val := mydb.get(keys)) is None: raise ExceptionHere use val here """ return (self.deserializer(self.db.getVal(db=self.sdb, key=self._tokey(keys))))
[docs] def getDict(self, keys: Union[str, Iterable]): """ Gets dictified val at keys Parameters: keys (tuple): of key strs to be combined in order to form key Returns: val (dict): None if no entry at keys Usage: Use walrus operator to catch and raise missing entry if (val := mydb.get(keys)) is None: raise ExceptionHere use val here """ val = self.get(keys) return helping.dictify(val) if val is not None else None
[docs] def rem(self, keys: Union[str, Iterable]): """ Removes entry at keys Parameters: keys (tuple): of key strs to be combined in order to form key Returns: result (bool): True if key exists so delete successful. False otherwise """ return (self.db.delVal(db=self.sdb, key=self._tokey(keys)))
[docs] def trim(self, keys: Union[str, Iterable]=b""): """ Removes all entries whose keys startswith keys. Enables removal of whole branches of db key space. To ensure that proper separation of a branch include empty string as last key in keys. For example ("a","") deletes 'a.1'and 'a.2' but not 'ab' Parameters: keys (tuple): of key strs to be combined in order to form key Returns: result (bool): True if key exists so delete successful. False otherwise """ return(self.db.delTopVal(db=self.sdb, key=self._tokey(keys)))
[docs] def cntAll(self): """ Return iterator over the all the items in subdb Returns: iterator: of tuples of keys tuple and val dataclass instance for each entry in db. Raises StopIteration when done Example: if key in database is "a.b" and val is serialization of dataclass with attributes x and y then returns (("a","b"), dataclass(x=1,y=2)) """ return self.db.cnt(db=self.sdb)
[docs] class IoSetKomer(KomerBase): """ Insertion Ordered Set Keyspace Object Mapper factory class that supports a set of distinct entries at a given effective database key but with dupsort==False. Effective data model is that there are multiple values in a set of values where every member of the set has the same key (duplicate key). The set of values is an ordered set using insertion order. Any given value may appear only once in the set (not a list). This works similarly to the IO value duplicates for the LMDBer class with a sub db of LMDB (dupsort==True) but without its size limitation of 511 bytes for each value when dupsort==True. Here the key is augmented with a hidden numbered suffix that provides a an ordered set of values at each effective key (duplicate key). The suffix is appended and stripped transparently. The set of multiple items with duplicate keys are retrieved in insertion order when iterating or as a list of the set elements. Attributes: db (dbing.LMDBer): instance of LMDB database manager class sdb (lmdb._Database): instance of named sub db lmdb for this Komer schema (Type[dataclass]): class reference of dataclass subclass kind (str): serialization/deserialization type from coring.Serials serializer (types.MethodType): serializer method deserializer (types.MethodType): deserializer method sep (str): separator for combining keys tuple of strs into key bytes """
[docs] def __init__(self, db: dbing.LMDBer, *, subkey: str = 'recs.', schema: Type[dataclass], # class not instance kind: str = coring.Serials.json, **kwa): """ Parameters: db (dbing.LMDBer): base db schema (Type[dataclass]): reference to Class definition for dataclass sub class subkey (str): LMDB sub database key kind (str): serialization/deserialization type """ super(IoSetKomer, self).__init__(db=db, subkey=subkey, schema=schema, kind=kind, dupsort=False, **kwa)
[docs] def put(self, keys: Union[str, Iterable], vals: list): """ Puts all vals at key made from keys. Does not overwrite. Puts all vals at effective key made from keys and hidden ordinal suffix. that are not already in set of vals at key. Does not overwrite. Parameters: keys (tuple): of key strs to be combined in order to form key vals (list): dataclass instances each of type self.schema as values Returns: result (bool): True If successful, False otherwise. Apparently always returns True (how .put works with dupsort=True) """ vals = [self.serializer(val) for val in vals] return (self.db.putIoSetVals(db=self.sdb, key=self._tokey(keys), vals=vals, sep=self.sep))
[docs] def add(self, keys: Union[str, Iterable], val: dataclass): """ Add val to vals at effective key made from keys and hidden ordinal suffix. that is not already in set of vals at key. Does not overwrite. Parameters: keys (tuple): of key strs to be combined in order to form key val (dataclass): instance of type self.schema Returns: result (bool): True means unique value among duplications, False means duplicte of same value already exists. """ return (self.db.addIoSetVal(db=self.sdb, key=self._tokey(keys), val=self.serializer(val), sep=self.sep))
[docs] def pin(self, keys: Union[str, Iterable], vals: list): """ Pins (sets) vals at effective key made from keys and hidden ordinal suffix. Overwrites. Removes all pre-existing vals that share same effective keys and replaces them with vals Parameters: keys (tuple): of key strs to be combined in order to form key vals (list): dataclass instances each of type self.schema as values Returns: result (bool): True If successful, False otherwise. """ key = self._tokey(keys) self.db.delIoSetVals(db=self.sdb, key=key) # delete all values vals = [self.serializer(val) for val in vals] return (self.db.setIoSetVals(db=self.sdb, key=key, vals=vals, sep=self.sep))
[docs] def get(self, keys: Union[str, Iterable]): """ Gets dup vals list at key made from keys Parameters: keys (tuple): of key strs to be combined in order to form key Returns: vals (list): each item in list is instance of type self.schema empty list if no entry at keys """ return [self.deserializer(val) for val in self.db.getIoSetValsIter(db=self.sdb, key=self._tokey(keys), sep=self.sep)]
[docs] def getLast(self, keys: Union[str, Iterable]): """ Gets last effective dup val at effective dup key made from keys Parameters: keys (tuple): of key strs to be combined to form effective key Returns: val (Type[dataclass]): instance of type self.schema None if no entry at keys """ val = self.db.getIoSetValLast(db=self.sdb, key=self._tokey(keys)) if val is not None: val = self.deserializer(val) return val
[docs] def getIter(self, keys: Union[str, Iterable]): """ Gets vals iterator at effecive key made from keys and hidden ordinal suffix. All vals in set of vals that share same effecive key are retrieved in insertion order. Parameters: keys (Iterable): of key strs to be combined in order to form key Returns: vals (Iterator): str values. Raises StopIteration when done """ for val in self.db.getIoSetValsIter(db=self.sdb, key=self._tokey(keys), sep=self.sep): yield self.deserializer(val)
[docs] def cnt(self, keys: Union[str, Iterable]): """ Return count of effective dup values at key made from keys, zero otherwise Parameters: keys (tuple): of key strs to be combined in order to form key """ return (self.db.cntIoSetVals(db=self.sdb, key=self._tokey(keys), sep=self.sep))
[docs] def rem(self, keys: Union[str, Iterable], val=None): """ Removes entry at keys Parameters: keys (tuple): of key strs to be combined in order to form key val (dataclass): instance of effective dup val at key to delete if val is None then remove all values at key Returns: result (bool): True if key exists so delete successful. False otherwise """ if val is not None: val = self.serializer(val) return self.db.delIoSetVal(db=self.sdb, key=self._tokey(keys), val=val, sep=self.sep) else: return self.db.delIoSetVals(db=self.sdb, key=self._tokey(keys), sep=self.sep)
[docs] def getItemIter(self, keys: Union[str, Iterable]=b""): """Get items iterator Returns: items (Iterator): of (key, val) tuples over the all the items in subdb whose effective key startswith key made from keys. Keys may be keyspace prefix in order to return branches of key space. When keys is empty then returns all items in subdb. Returned key in each item has ordinal suffix removed. Parameters: keys (Iterator): tuple of bytes or strs that may be a truncation of a full keys tuple in in order to get all the items from multiple branches of the key space. If keys is empty then gets all items in database. Append "" to end of keys Iterable to ensure get properly separated top branch key. """ for iokey, val in self.db.getTopItemIter(db=self.sdb, key=self._tokey(keys)): key, ion = dbing.unsuffix(iokey, sep=self.sep) yield (self._tokeys(key), self.deserializer(val))
[docs] def getIoSetItem(self, keys: Union[str, Iterable]): """ Gets (iokeys, val) ioitems list at key made from keys where key is apparent effective key and ioitems all have same apparent effective key Parameters: keys (Iterable): of key strs to be combined in order to form key Returns: ioitems (Iterable): each item in list is tuple (iokeys, val) where each iokeys is actual key tuple including hidden suffix and each val is str empty list if no entry at keys """ return ([(self._tokeys(iokey), self.deserializer(val)) for iokey, val in self.db.getIoSetItems(db=self.sdb, key=self._tokey(keys), sep=self.sep)])
[docs] def getIoSetItemIter(self, keys: Union[str, Iterable]): """ Gets (iokeys, val) ioitems iterator at key made from keys where key is apparent effective key and items all have same apparent effective key Parameters: keys (Iterable): of key strs to be combined in order to form key Returns: ioitems (Iterator): each item iterated is tuple (iokeys, val) where each iokeys is actual keys tuple including hidden suffix and each val is str empty list if no entry at keys. Raises StopIteration when done """ for iokey, val in self.db.getIoSetItemsIter(db=self.sdb, key=self._tokey(keys), sep=self.sep): yield (self._tokeys(iokey), self.deserializer(val))
[docs] def getIoItemIter(self, keys: Union[str, Iterable]=b""): """ Returns: items (Iterator): tuple (key, val) over the all the items in subdb whose key startswith effective key made from keys. Keys may be keyspace prefix to return branches of key space. When keys is empty then returns all items in subdb. Parameters: keys (Iterable): tuple of bytes or strs that may be a truncation of a full keys tuple in in order to get all the items from multiple branches of the key space. If keys is empty then gets all items in database. Append "" to end of keys Iterable to ensure get properly separated top branch key. """ for iokey, val in self.db.getTopItemIter(db=self.sdb, key=self._tokey(keys)): yield (self._tokeys(iokey), self.deserializer(val))
[docs] def remIokey(self, iokeys: Union[str, bytes, memoryview, Iterable]): """ Removes entry at iokeys Parameters: iokeys (tuple): of key str or tuple of key strs to be combined in order to form key Returns: result (bool): True if key exists so delete successful. False otherwise """ return self.db.delIoSetIokey(db=self.sdb, iokey=self._tokey(iokeys))
[docs] class DupKomer(KomerBase): """ Duplicate Keyspace Object Mapper factory class that supports multiple entries a given database key (lmdb dupsort == True). Do not use if Komer schema instance serialized is greater than 511 bytes. This is a limitation of dupsort==True sub dbs in LMDB """
[docs] def __init__(self, db: dbing.LMDBer, *, subkey: str = 'recs.', schema: Type[dataclass], # class not instance kind: str = coring.Serials.json, **kwa): """ Parameters: db (dbing.LMDBer): base db schema (Type[dataclass]): reference to Class definition for dataclass sub class subkey (str): LMDB sub database key kind (str): serialization/deserialization type """ super(DupKomer, self).__init__(db=db, subkey=subkey, schema=schema, kind=kind, dupsort=True, **kwa)
[docs] def put(self, keys: Union[str, Iterable], vals: list): """ Puts all vals at key made from keys. Does not overwrite. Adds to existing dup values at key if any. Duplicate means another entry at the same key but the entry is still a unique value. Duplicates are inserted in lexocographic order not insertion order. Lmdb does not insert a duplicate unless it is a unique value for that key. Parameters: keys (tuple): of key strs to be combined in order to form key vals (list): dataclass instances each of type self.schema as values Returns: result (bool): True If successful, False otherwise. Apparently always returns True (how .put works with dupsort=True) """ vals = [self.serializer(val) for val in vals] return (self.db.putVals(db=self.sdb, key=self._tokey(keys), vals=vals))
[docs] def add(self, keys: Union[str, Iterable], val: dataclass): """ Add val to vals at key made from keys. Does not overwrite. Adds to existing dup values at key if any. Duplicate means another entry at the same key but the entry is still a unique value. Duplicates are inserted in lexocographic order not insertion order. Lmdb does not insert a duplicate unless it is a unique value for that key. Parameters: keys (tuple): of key strs to be combined in order to form key val (dataclass): instance of type self.schema Returns: result (bool): True means unique value among duplications, False means duplicte of same value already exists. """ return (self.db.addVal(db=self.sdb, key=self._tokey(keys), val=self.serializer(val)))
[docs] def pin(self, keys: Union[str, Iterable], vals: list): """ Pins (sets) vals at key made from keys. Overwrites. Removes all pre-existing dup vals and replaces them with vals Parameters: keys (tuple): of key strs to be combined in order to form key vals (list): dataclass instances each of type self.schema as values Returns: result (bool): True If successful, False otherwise. """ key = self._tokey(keys) self.db.delVals(db=self.sdb, key=key) # delete all values vals = [self.serializer(val) for val in vals] return (self.db.putVals(db=self.sdb, key=key, vals=vals))
[docs] def get(self, keys: Union[str, Iterable]): """ Gets dup vals list at key made from keys Parameters: keys (tuple): of key strs to be combined in order to form key Returns: vals (list): each item in list is instance of type self.schema empty list if no entry at keys """ return ([self.deserializer(val) for val in self.db.getValsIter(db=self.sdb, key=self._tokey(keys))])
[docs] def getLast(self, keys: Union[str, Iterable]): """ Gets last dup val at key made from keys Parameters: keys (tuple): of key strs to be combined in order to form key Returns: val (Type[dataclass]): instance of type self.schema None if no entry at keys """ val = self.db.getValLast(db=self.sdb, key=self._tokey(keys)) if val is not None: val = self.deserializer(val) return val
[docs] def getIter(self, keys: Union[str, Iterable]): """ Gets dup vals iterator at key made from keys Duplicates are retrieved in lexocographic order not insertion order. Parameters: keys (tuple): of key strs to be combined in order to form key Returns: iterator: vals each of type self.schema. Raises StopIteration when done """ for val in self.db.getValsIter(db=self.sdb, key=self._tokey(keys)): yield self.deserializer(val)
[docs] def cnt(self, keys: Union[str, Iterable]): """ Return count of dup values at key made from keys, zero otherwise Parameters: keys (tuple): of key strs to be combined in order to form key """ return (self.db.cntVals(db=self.sdb, key=self._tokey(keys)))
[docs] def rem(self, keys: Union[str, Iterable], val=None): """ Removes entry at keys Parameters: keys (tuple): of key strs to be combined in order to form key val (dataclass): instance of dup val at key to delete if val is None then remove all values at key Returns: result (bool): True if key exists so delete successful. False otherwise """ if val is not None: val = self.serializer(val) else: val = b'' return (self.db.delVals(db=self.sdb, key=self._tokey(keys), val=val))