Source code for keri.db.dbing

# -*- encoding: utf-8 -*-
"""
keri.db.dbing module


import lmdb
db = lmdb.open("/tmp/keri_db_setup_test")
db.max_key_size()
511

# create named dbs  (core and tables)
    gDbEnv.open_db(b'core')
    gDbEnv.open_db(b'hid2did')  # table of dids keyed by hids
    gDbEnv.open_db(b'did2offer', dupsort=True)  # table of offer expirations keyed by offer relative dids
    gDbEnv.open_db(b'anon', dupsort=True)  # anonymous messages
    gDbEnv.open_db(b'expire2uid', dupsort=True)  # expiration to uid anon

The dupsort, integerkey, integerdup, and dupfixed parameters are ignored
if the database already exists.
The state of those settings are persistent and immutable per database.
See _Database.flags() to view the state of those options for an opened database.
A consequence of the immutability of these flags is that the default non-named
database will never have these flags set.

So only need to set dupsort first time opened each other opening does not
need to call it


May want to use buffers for reads of immutable serializations such as events
and sigs. Anything not read modify write but read only.

"{:032x}".format(1024)
'00000000000000000000000000000400'

h = ["00", "01", "02", "0a", "0f", "10", "1a", "11", "1f", "f0", "a0"]
h.sort()
h
['00', '01', '02', '0a', '0f', '10', '11', '1a', '1f', 'a0', 'f0']

l
['a', 'aa', 'b', 'ba', 'aaa', 'baa']
l.sort()
l
['a', 'aa', 'aaa', 'b', 'ba', 'baa']

"""

import os
import shutil
import stat
from collections import abc
from contextlib import contextmanager
from typing import Union

import lmdb
from  ordered_set import OrderedSet as oset

from hio.base import filing

from hio.base import filing

from ..help import helping

ProemSize = 32  # does not include trailing separator
MaxProem = int("f"*(ProemSize), 16)
MaxON = int("f"*32, 16)  # largest possible ordinal number, sequence or first seen

SuffixSize = 32  # does not include trailing separator
MaxSuffix = int("f"*(SuffixSize), 16)

[docs] def dgKey(pre, dig): """ Returns bytes DB key from concatenation of '.' with qualified Base64 prefix bytes pre and qualified Base64 bytes digest of serialized event If pre or dig are str then converts to bytes """ if hasattr(pre, "encode"): pre = pre.encode("utf-8") # convert str to bytes if hasattr(dig, "encode"): dig = dig.encode("utf-8") # convert str to bytes return (b'%s.%s' % (pre, dig))
[docs] def onKey(pre, sn): """ Returns bytes DB key from concatenation with '.' of qualified Base64 prefix bytes pre and int ordinal number of event, such as sequence number or first seen order number. """ if hasattr(pre, "encode"): pre = pre.encode("utf-8") # convert str to bytes return (b'%s.%032x' % (pre, sn))
snKey = onKey # alias so intent is clear, sn vs fn fnKey = onKey # alias so intent is clear, sn vs fn
[docs] def dtKey(pre, dts): """ Returns bytes DB key from concatenation of '|' qualified Base64 prefix bytes pre and bytes dts datetime string of extended tz aware ISO8601 datetime of event '2021-02-13T19:16:50.750302+00:00' """ if hasattr(pre, "encode"): pre = pre.encode("utf-8") # convert str to bytes if hasattr(dts, "encode"): dts = dts.encode("utf-8") # convert str to bytes return (b'%s|%s' % (pre, dts))
[docs] def splitKey(key, sep=b'.'): """ Returns duple of pre and either dig or on, sn, fn str or dts datetime str by splitting key at bytes sep Accepts either bytes or str key and returns same type Raises ValueError if key does not split into exactly two elements Parameters: key is database key with split at sep sep is bytes separator character. default is b'.' """ if isinstance(key, memoryview): key = bytes(key) if hasattr(key, "encode"): # str not bytes if hasattr(sep, 'decode'): # make sep match bytes or str sep = sep.decode("utf-8") else: if hasattr(sep, 'encode'): # make sep match bytes or str sep = sep.encode("utf-8") splits = key.split(sep) if len(splits) != 2: raise ValueError("Unsplittable key = {}".format(key)) return tuple(splits)
[docs] def splitKeyON(key): """ Returns list of pre and int on from key Accepts either bytes or str key ordinal number appears in key in hex format """ if isinstance(key, memoryview): key = bytes(key) pre, on = splitKey(key) on = int(on, 16) return (pre, on)
splitKeySN = splitKeyON # alias so intent is clear, sn vs fn splitKeyFN = splitKeyON # alias so intent is clear, sn vs fn
[docs] def splitKeyDT(key): """ Returns list of pre and dts converted to datetime from key dts is TZ aware Iso8601 '2021-02-13T19:16:50.750302+00:00' Accepts either bytes or str key """ if isinstance(key, memoryview): key = bytes(key) pre, dts = splitKey(key, sep=b'|') if hasattr(dts, "decode"): dts = dts.decode("utf-8") dt = helping.fromIso8601(dts) return (pre, dt)
[docs] def suffix(key: Union[bytes, str, memoryview], ion: int, *, sep: Union[bytes, str]=b'.'): """ Returns: iokey (bytes): actual DB key after concatenating suffix as hex version of insertion ordering ordinal int ion using separator sep. Parameters: key (Union[bytes, str]): apparent effective database key (unsuffixed) ion (int)): insertion ordering ordinal for set of vals sep (bytes): separator character(s) for concatenating suffix """ if isinstance(key, memoryview): key = bytes(key) elif hasattr(key, "encode"): key = key.encode("utf-8") # encode str to bytes if hasattr(sep, "encode"): sep = sep.encode("utf-8") ion = b"%032x" % ion return sep.join((key, ion))
[docs] def unsuffix(iokey: Union[bytes, str, memoryview], *, sep: Union[bytes, str]=b'.'): """ Returns: result (tuple): (key, ion) by splitting iokey at rightmost separator sep strip off suffix, where key is bytes apparent effective DB key and ion is the insertion ordering int converted from stripped of hex suffix Parameters: iokey (Union[bytes, str]): apparent effective database key (unsuffixed) sep (bytes): separator character(s) for concatenating suffix """ if isinstance(iokey, memoryview): iokey = bytes(iokey) elif hasattr(iokey, "encode"): iokey = iokey.encode("utf-8") # encode str to bytes if hasattr(sep, "encode"): sep = sep.encode("utf-8") key, ion = iokey.rsplit(sep=sep, maxsplit=1) ion = int(ion, 16) return (key, ion)
[docs] def clearDatabaserDir(path): """ Remove directory path """ if os.path.exists(path): shutil.rmtree(path)
[docs] @contextmanager def openLMDB(*, cls=None, name="test", temp=True, **kwa): """ Context manager wrapper LMDBer instances. Defaults to temporary databases. Context 'with' statements call .close on exit of 'with' block Parameters: cls is Class instance of subclass instance name is str name of LMDBer dirPath so can have multiple databasers at different directory path names thar each use different name temp is Boolean, True means open in temporary directory, clear on close Otherwise open in persistent directory, do not clear on close Usage: with openDatabaser(name="gen1") as baser1: baser1.env .... with openDatabaser(name="gen2, cls=Baser) wl.close(clear=True if wl.temp else False) """ lmdber = None if cls is None: cls = LMDBer try: lmdber = cls(name=name, temp=temp, reopen=True, **kwa) yield lmdber finally: if lmdber: lmdber.close(clear=lmdber.temp) # clears if lmdber.temp
[docs] class LMDBer(filing.Filer): """ LBDBer base class for LMDB manager instances. Creates a specific instance of an LMDB database directory and environment. Attributes: (inherited) name (str): unique path component used in directory or file path name base (str): another unique path component inserted before name temp (bool): True means use /tmp directory headDirPath is head directory path path is full directory path perm is numeric os permissions for directory and/or file(s) filed (bool): True means .path ends in file. False means .path ends in directory mode (str): file open mode if filed fext (str): file extension if filed file (File) opened is Boolean, True means directory created and if file then file is opened. False otherwise Attributes: env (lmdb.env): LMDB main (super) database environment readonly (bool): True means open LMDB env as readonly Properties: File/Directory Creation Mode Notes: .Perm provides default restricted access permissions to directory and/or files stat.S_ISVTX | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR 0o1700==960 stat.S_ISVTX is Sticky bit. When this bit is set on a directory it means that a file in that directory can be renamed or deleted only by the owner of the file, by the owner of the directory, or by a privileged process. When this bit is set on a file it means nothing stat.S_IRUSR Owner has read permission. stat.S_IWUSR Owner has write permission. stat.S_IXUSR Owner has execute permission. """ HeadDirPath = "/usr/local/var" # default in /usr/local/var TailDirPath = "keri/db" CleanTailDirPath = "keri/clean/db" AltHeadDirPath = "~" # put in ~ as fallback when desired not permitted AltTailDirPath = ".keri/db" AltCleanTailDirPath = ".keri/clean/db" TempHeadDir = "/tmp" TempPrefix = "keri_lmdb_" TempSuffix = "_test" Perm = stat.S_ISVTX | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR # 0o1700==960 MaxNamedDBs = 96
[docs] def __init__(self, readonly=False, **kwa): """ Setup main database directory at .dirpath. Create main database environment at .env using .path. Parameters: name (str): directory path name differentiator directory/file When system employs more than one keri installation, name allows differentiating each instance by name base (str): optional directory path segment inserted before name that allows further differentiation with a hierarchy. "" means optional. temp (bool): assign to .temp True then open in temporary directory, clear on close Otherwise then open persistent directory, do not clear on close headDirPath (str): optional head directory pathname for main database Default .HeadDirPath mode (int): optional numeric os dir permissions for database directory and database files. Default .DirMode reopen (bool): True means (re)opened by this init False means not (re)opened by this init but later clear (bool): True means remove directory upon close if reopon False means do not remove directory upon close if reopen reuse (bool): True means reuse self.path if already exists False means do not reuse but remake self.path clean (bool): True means path uses clean tail variant False means path uses normal tail variant filed (bool): True means .path is file path not directory path False means .path is directory path not file path mode (str): File open mode when filed fext (str): File extension when filed readonly (bool): True means open database in readonly mode False means open database in read/write mode """ self.env = None self.readonly = True if readonly else False super(LMDBer, self).__init__(**kwa)
[docs] def reopen(self, readonly=False, **kwa): """ Open if closed or close and reopen if opened or create and open if not if not preexistent, directory path for lmdb at .path and then Open lmdb and assign to .env Parameters: temp (bool): assign to .temp True means open in temporary directory, clear on close False means open persistent directory, do not clear on close headDirPath (str): optional head directory pathname for main database Default .HeadDirpath perm (int): optional numeric os dir permissions for database directory and database files. Default .Perm clear (bool): True means remove directory upon close False means do not remove directory upon close reuse (bool): True means reuse self.path if already exists False means do not reuse but remake self.path clean (bool): True means path uses clean tail variant False means path uses normal tail variant mode (str): file open mode when .filed fext (str): File extension when .filed readonly (bool): True means open database in readonly mode False means open database in read/write mode """ opened = super(LMDBer, self).reopen(**kwa) if readonly is not None: self.readonly = readonly # open lmdb major database instance # creates files data.mdb and lock.mdb in .dbDirPath self.env = lmdb.open(self.path, max_dbs=self.MaxNamedDBs, map_size=104857600, mode=self.perm, readonly=self.readonly) self.opened = True if opened and self.env else False return self.opened
[docs] def close(self, clear=False): """ Close lmdb at .env and if clear or .temp then remove lmdb directory at .path Parameters: clear is boolean, True means clear lmdb directory """ if self.env: try: self.env.close() except: pass self.env = None return(super(LMDBer, self).close(clear=clear))
# For subdbs with no duplicate values allowed at each key. (dupsort==False)
[docs] def putVal(self, db, key, val): """ Write serialized bytes val to location key in db Does not overwrite. Returns True If val successfully written Else False Returns False if val at key already exitss Parameters: db is opened named sub db with dupsort=False key is bytes of key within sub db's keyspace val is bytes of value to be written """ with self.env.begin(db=db, write=True, buffers=True) as txn: try: return (txn.put(key, val, overwrite=False)) except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
[docs] def setVal(self, db, key, val): """ Write serialized bytes val to location key in db Overwrites existing val if any Returns True If val successfully written Else False Parameters: db is opened named sub db with dupsort=False key is bytes of key within sub db's keyspace val is bytes of value to be written """ with self.env.begin(db=db, write=True, buffers=True) as txn: try: return (txn.put(key, val)) except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
[docs] def getVal(self, db, key): """ Return val at key in db Returns None if no entry at key Parameters: db is opened named sub db with dupsort=False key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=False, buffers=True) as txn: try: return(txn.get(key)) except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
[docs] def delVal(self, db, key): """ Deletes value at key in db. Returns True If key exists in database Else False Parameters: db is opened named sub db with dupsort=False key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=True, buffers=True) as txn: try: return (txn.delete(key)) except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
[docs] def cnt(self, db): """ Return count of values in db, or zero otherwise Parameters: db is opened named sub db with dupsort=True """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() count = 0 for _, _ in cursor: count += 1 return count
[docs] def getAllItemIter(self, db, key=b'', split=True, sep=b'.'): """ Returns iterator of item duple (key, val), at each key over all keys in db. If split is true then the key is split at sep and instead of returing duple it results tuple with one entry for each key split as well as the value. Works for both dupsort==False and dupsort==True Raises StopIteration Error when empty. Parameters: db is opened named sub db with dupsort=False key is key location in db to resume replay, If empty then start at first key in database split (bool): True means split key at sep before returning sep (bytes): separator char for key """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() if not cursor.set_range(key): # moves to val at key >= key, first if empty return # no values end of db for key, val in cursor.iternext(): # return key, val at cursor if split: splits = bytes(key).split(sep) splits.append(val) else: splits = (bytes(key), val) yield tuple(splits)
[docs] def getTopItemIter(self, db, key=b''): """ Iterates over branch of db given by key Returns: items (abc.Iterator): iterator of (full key, val) tuples over a branch of the db given by top key where: full key is full database key for val not truncated top key Works for both dupsort==False and dupsort==True Because cursor.iternext() advances cursor after returning item its safe to delete the item within the iteration loop. Raises StopIteration Error when empty. Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): truncated top key, a key space prefix to get all the items from multiple branches of the key space. If top key is empty then gets all items in database """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() if cursor.set_range(key): # move to val at key >= key if any for ckey, cval in cursor.iternext(): # get key, val at cursor ckey = bytes(ckey) if not ckey.startswith(key): # prev entry if any last in branch break # done yield (ckey, cval) # another entry in branch startswith key return # done raises StopIteration
[docs] def delTopVal(self, db, key=b''): """ Deletes all values in branch of db given top key. Returns: result (bool): True if values were deleted at key. False otherwise if no values at key Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): truncated top key, a key space prefix to get all the items from multiple branches of the key space. If top key is empty then gets all items in database Works for both dupsort==False and dupsort==True Because cursor.iternext() advances cursor after returning item its safe to delete the item within the iteration loop. Raises StopIteration Error when empty. """ # when deleting can't use cursor.iternext() because the cursor advances # twice (skips one) once for iternext and once for delete. with self.env.begin(db=db, write=True, buffers=True) as txn: result = False cursor = txn.cursor() if cursor.set_range(key): # move to val at key >= key if any ckey, cval = cursor.item() while ckey: # end of database key == b'' ckey = bytes(ckey) if not ckey.startswith(key): # prev entry if any last in branch break # done result = cursor.delete() or result # delete moves cursor to next item ckey, cval = cursor.item() # cursor now at next item after deleted return result
# For subdbs with no duplicate values allowed at each key. (dupsort==False) # and use keys with ordinal as monotonically increasing number part # such as sn or fn
[docs] def appendOrdValPre(self, db, pre, val): """ Appends val in order after last previous key with same pre in db. Returns ordinal number in, on, of appended entry. Appended on is 1 greater than previous latest on. Uses onKey(pre, on) for entries. Append val to end of db entries with same pre but with on incremented by 1 relative to last preexisting entry at pre. Parameters: db is opened named sub db with dupsort=False pre is bytes identifier prefix for event val is event digest """ # set key with fn at max and then walk backwards to find last entry at pre # if any otherwise zeroth entry at pre key = onKey(pre, MaxON) with self.env.begin(db=db, write=True, buffers=True) as txn: on = 0 # unless other cases match then zeroth entry at pre cursor = txn.cursor() if not cursor.set_range(key): # max is past end of database # so either empty database or last is earlier pre or # last is last entry at same pre if cursor.last(): # not empty db. last entry earlier than max ckey = cursor.key() cpre, cn = splitKeyON(ckey) if cpre == pre: # last is last entry for same pre on = cn + 1 # increment else: # not past end so not empty either later pre or max entry at pre ckey = cursor.key() cpre, cn = splitKeyON(ckey) if cpre == pre: # last entry for pre is already at max raise ValueError("Number part of key {} exceeds maximum" " size.".format(ckey)) else: # later pre so backup one entry # either no entry before last or earlier pre with entry if cursor.prev(): # prev entry, maybe same or earlier pre ckey = cursor.key() cpre, cn = splitKeyON(ckey) if cpre == pre: # last entry at pre on = cn + 1 # increment key = onKey(pre, on) if not cursor.put(key, val, overwrite=False): raise ValueError("Failed appending {} at {}.".format(val, key)) return on
[docs] def getAllOrdItemPreIter(self, db, pre, on=0): """ Returns iterator of duple item, (on, dig), at each key over all ordinal numbered keys with same prefix, pre, in db. Values are sorted by onKey(pre, on) where on is ordinal number int. Returned items are duples of (on, dig) where on is ordinal number int and dig is event digest for lookup in .evts sub db. Raises StopIteration Error when empty. Parameters: db is opened named sub db with dupsort=False pre is bytes of itdentifier prefix on is int ordinal number to resume replay """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() key = onKey(pre, on) # start replay at this enty 0 is earliest if not cursor.set_range(key): # moves to val at key >= key return # no values end of db for key, val in cursor.iternext(): # get key, val at cursor cpre, cn = splitKeyON(key) if cpre != pre: # prev is now the last event for pre break # done yield (cn, val) # (on, dig) of event
[docs] def getAllOrdItemAllPreIter(self, db, key=b''): """ Returns iterator of triple item, (pre, on, dig), at each key over all ordinal numbered keys for all prefixes in db. Values are sorted by onKey(pre, on) where on is ordinal number int. Each returned item is triple (pre, on, dig) where pre is identifier prefix, on is ordinal number int and dig is event digest for lookup in .evts sub db. Raises StopIteration Error when empty. Parameters: db is opened named sub db with dupsort=False key is key location in db to resume replay, If empty then start at first key in database """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() if not cursor.set_range(key): # moves to val at key >= key, first if empty return # no values end of db for key, val in cursor.iternext(): # return key, val at cursor cpre, cn = splitKeyON(key) yield (cpre, cn, val) # (pre, on, dig) of event
# For databases that support set of insertion ordered values with apparent # effective duplicate key but with (dupsort==False). Actual key uses hidden # key suffix ordinal to provide insertion ordering of value members of set # with same effective duplicate key. # Provides dupsort==True like functionality but without the associated value # size limitation of 511 bytes.
[docs] def putIoSetVals(self, db, key, vals, *, sep=b'.'): """ Add each val in vals to insertion ordered set of values all with the same apparent effective key for each val that is not already in set of vals at key. Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Returns: result (bool): True is added to set. False if already in set. Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key vals (abc.Iterable): serialized values to add to set of vals at key """ result = False vals = oset(vals) # make set with self.env.begin(db=db, write=True, buffers=True) as txn: ion = 0 iokey = suffix(key, ion, sep=sep) # start zeroth entry if any cursor = txn.cursor() if cursor.set_range(iokey): # move to val at key >= iokey if any pvals = oset() # pre-existing vals at key for iokey, val in cursor.iternext(): # get iokey, val at cursor ckey, cion = unsuffix(iokey, sep=sep) if ckey == key: pvals.add(val) # another entry at key ion = cion + 1 # ion to add at is increment of cion else: # prev entry if any was the last entry for key break # done vals -= pvals # remove vals already in pvals for i, val in enumerate(vals): iokey = suffix(key, ion+i, sep=sep) # ion is at add on amount result = cursor.put(iokey, val, dupdata=False, overwrite=False) or result # not short circuit return result
[docs] def addIoSetVal(self, db, key, val, *, sep=b'.'): """ Add val to insertion ordered set of values all with the same apparent effective key if val not already in set of vals at key. Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Returns: result (bool): True is added to set. False if already in set. Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key val (bytes): serialized value to add """ with self.env.begin(db=db, write=True, buffers=True) as txn: vals = oset() ion = 0 iokey = suffix(key, ion, sep=sep) # start zeroth entry if any cursor = txn.cursor() if cursor.set_range(iokey): # move to val at key >= iokey if any for iokey, cval in cursor.iternext(): # get iokey, val at cursor ckey, cion = unsuffix(iokey, sep=sep) if ckey == key: vals.add(cval) # another entry at key ion = cion + 1 # ion to add at is increment of cion else: # prev entry if any was the last entry for key break # done if val in vals: # already in set return False iokey = suffix(key, ion, sep=sep) # ion is at add on amount return cursor.put(iokey, val, dupdata=False, overwrite=False)
[docs] def setIoSetVals(self, db, key, vals, *, sep=b'.'): """ Erase all vals at key and then add unique vals as insertion ordered set of values all with the same apparent effective key. Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Returns: result (bool): True is added to set. Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key vals (abc.Iterable): serialized values to add to set of vals at key """ self.delIoSetVals(db=db, key=key, sep=sep) result = False vals = oset(vals) # make set with self.env.begin(db=db, write=True, buffers=True) as txn: for i, val in enumerate(vals): iokey = suffix(key, i, sep=sep) # ion is at add on amount result = txn.put(iokey, val, dupdata=False, overwrite=True) or result return result
[docs] def appendIoSetVal(self, db, key, val, *, sep=b'.'): """ Append val to insertion ordered set of values all with the same apparent effective key. Assumes val is not already in set. Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Returns: ion (int): hidden insertion ordering ordinal of appended val Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key val (bytes): value to append """ ion = 0 # default is zeroth insertion at key iokey = suffix(key, ion=MaxSuffix, sep=sep) # make iokey at max and walk back with self.env.begin(db=db, write=True, buffers=True) as txn: cursor = txn.cursor() # create cursor to walk back if not cursor.set_range(iokey): # max is past end of database # Three possibilities for max past end of database # 1. last entry in db is for same key # 2. last entry in db is for other key before key # 3. database is empty if cursor.last(): # not 3. empty db, so either 1. or 2. ckey, cion = unsuffix(cursor.key(), sep=sep) if ckey == key: # 1. last is last entry for same key ion = cion + 1 # so set ion to the increment of cion else: # max is not past end of database # Two possibilities for max not past end of databseso # 1. cursor at max entry at key # 2. other key after key with entry in database ckey, cion = unsuffix(cursor.key(), sep=sep) if ckey == key: # 1. last entry for key is already at max raise ValueError("Number part of key {} at maximum" " size.".format(ckey)) else: # 2. other key after key so backup one entry # Two possibilities: 1. no prior entry 2. prior entry if cursor.prev(): # prev entry, maybe same or earlier pre # 2. prior entry with two possiblities: # 1. same key # 2. other key before key ckey, cion = unsuffix(cursor.key(), sep=sep) if ckey == key: # prior (last) entry at key ion = cion + 1 # so set ion to the increment of cion iokey = suffix(key, ion=ion, sep=sep) if not cursor.put(iokey, val, overwrite=False): raise ValueError("Failed appending {} at {}.".format(val, key)) return ion
[docs] def getIoSetVals(self, db, key, *, ion=0, sep=b'.'): """ Returns: ioset (oset): the insertion ordered set of values at same apparent effective key. Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key ion (int): starting ordinal value, default 0 """ with self.env.begin(db=db, write=False, buffers=True) as txn: vals = [] iokey = suffix(key, ion, sep=sep) # start ion th value for key zeroth default cursor = txn.cursor() if cursor.set_range(iokey): # move to val at key >= iokey if any for iokey, val in cursor.iternext(): # get iokey, val at cursor ckey, cion = unsuffix(iokey, sep=sep) if ckey != key: # prev entry if any was the last entry for key break # done vals.append(val) # another entry at key return vals
[docs] def getIoSetValsIter(self, db, key, *, ion=0, sep=b'.'): """ Returns: ioset (abc.Iterator): iterator over insertion ordered set of values at same apparent effective key. Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Raises StopIteration Error when empty. Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key ion (int): starting ordinal value, default 0 """ with self.env.begin(db=db, write=False, buffers=True) as txn: iokey = suffix(key, ion, sep=sep) # start ion th value for key zeroth default cursor = txn.cursor() if cursor.set_range(iokey): # move to val at key >= iokey if any for iokey, val in cursor.iternext(): # get key, val at cursor ckey, cion = unsuffix(iokey, sep=sep) if ckey != key: # prev entry if any was the last entry for key break # done yield (val) # another entry at key return # done raises StopIteration
[docs] def getIoSetValLast(self, db, key, *, sep=b'.'): """ Returns: val (bytes): last added empty at apparent effective key if any, otherwise None if no entry Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key """ val = None ion = None # no last value iokey = suffix(key, ion=MaxSuffix, sep=sep) # make iokey at max and walk back with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() # create cursor to walk back if not cursor.set_range(iokey): # max is past end of database # Three possibilities for max past end of database # 1. last entry in db is for same key # 2. last entry in db is for other key before key # 3. database is empty if cursor.last(): # not 3. empty db, so either 1. or 2. ckey, cion = unsuffix(cursor.key(), sep=sep) if ckey == key: # 1. last is last entry for same key ion = cion # so set ion to cion else: # max is not past end of database # Two possibilities for max not past end of databseso # 1. cursor at max entry at key # 2. other key after key with entry in database ckey, cion = unsuffix(cursor.key(), sep=sep) if ckey == key: # 1. last entry for key is already at max ion = cion else: # 2. other key after key so backup one entry # Two possibilities: 1. no prior entry 2. prior entry if cursor.prev(): # prev entry, maybe same or earlier pre # 2. prior entry with two possiblities: # 1. same key # 2. other key before key ckey, cion = unsuffix(cursor.key(), sep=sep) if ckey == key: # prior (last) entry at key ion = cion # so set ion to the cion if ion is not None: iokey = suffix(key, ion=ion, sep=sep) val = cursor.get(iokey) return val
[docs] def cntIoSetVals(self, db, key, *, sep=b'.'): """ Count all values with the same apparent effective key. Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Returns: count (int): count values in set at apparent effective key Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key """ return len(self.getIoSetVals(db=db, key=key, sep=sep))
[docs] def delIoSetVals(self, db, key, *, sep=b'.'): """ Deletes all values at apparent effective key. Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Returns: result (bool): True if values were deleted at key. False otherwise if no values at key Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key """ result = False with self.env.begin(db=db, write=True, buffers=True) as txn: iokey = suffix(key, 0, sep=sep) # start at zeroth value for key cursor = txn.cursor() if cursor.set_range(iokey): # move to val at key >= iokey if any iokey, cval = cursor.item() while iokey: # end of database iokey == b'' cant internext. ckey, cion = unsuffix(iokey, sep=sep) if ckey != key: # past key break result = cursor.delete() or result # delete moves cursor to next item iokey, cval = cursor.item() # cursor now at next item after deleted return result
[docs] def delIoSetVal(self, db, key, val, *, sep=b'.'): """ Deletes val at apparent effective key if exists. Uses hidden ordinal key suffix for insertion ordering. The suffix is appended and stripped transparently. Because the insertion order of val is not provided must perform a linear search over set of values. Another problem is that vals may get added and deleted in any order so the max suffix ion may creep up over time. The suffix ordinal max > 2**16 is an impossibly large number, however, so the suffix will not max out practically.But its not the most elegant solution. In some cases a better approach would be to use getIoSetItemsIter which returns the actual iokey not the apparent effetive key so can delete using the iokey without searching for the value. This is most applicable when processing escrows where all the escrowed items are processed linearly and one needs to delete some of them in stride with their processing. Returns: result (bool): True if val was deleted at key. False otherwise if val not found at key Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key val (bytes): value to delete """ with self.env.begin(db=db, write=True, buffers=True) as txn: iokey = suffix(key, 0, sep=sep) # start zeroth value for key cursor = txn.cursor() if cursor.set_range(iokey): # move to val at key >= iokey if any for iokey, cval in cursor.iternext(): # get iokey, val at cursor ckey, cion = unsuffix(iokey, sep=sep) if ckey != key: # prev entry if any was the last entry for key break # done if val == cval: return cursor.delete() # delete also moves to next so doubly moved return False
[docs] def getIoSetItems(self, db, key, *, ion=0, sep=b'.'): """ Returns: items (list): list of tuples (iokey, val) of entries in set of with same apparent effective key. iokey includes the ordinal key suffix Uses hidden ordinal key suffix for insertion ordering. Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key ion (int): starting ordinal value, default 0 """ with self.env.begin(db=db, write=False, buffers=True) as txn: items = [] iokey = suffix(key, ion, sep=sep) # start ion th value for key zeroth default cursor = txn.cursor() if cursor.set_range(iokey): # move to val at key >= iokey if any for iokey, val in cursor.iternext(): # get iokey, val at cursor ckey, cion = unsuffix(iokey, sep=sep) if ckey != key: # prev entry if any was the last entry for key break # done items.append((iokey, val)) # another entry at key return items
[docs] def getIoSetItemsIter(self, db, key, *, ion=0, sep=b'.'): """ Returns: items (abc.Iterator): iterator over insertion ordered set of values at same apparent effective key where each iteration returns tuple (iokey, val). iokey includes the ordinal key suffix. Uses hidden ordinal key suffix for insertion ordering. Raises StopIteration Error when empty. Parameters: db (lmdb._Database): instance of named sub db with dupsort==False key (bytes): Apparent effective key ion (int): starting ordinal value, default 0 """ with self.env.begin(db=db, write=False, buffers=True) as txn: iokey = suffix(key, ion, sep=sep) # start ion th value for key zeroth default cursor = txn.cursor() if cursor.set_range(iokey): # move to val at key >= iokey if any for iokey, val in cursor.iternext(): # get key, val at cursor ckey, cion = unsuffix(iokey, sep=sep) if ckey != key: # prev entry if any was the last entry for key break # done yield (iokey, val) # another entry at key return # done raises StopIteration
[docs] def delIoSetIokey(self, db, iokey): """ Deletes val at at actual iokey that includes ordinal key suffix. Returns: result (bool): True if val was deleted at iokey. False otherwise if no val at iokey Parameters: db (lmdb._Database): instance of named sub db with dupsort==False iokey (bytes): actual key with ordinal key suffix """ with self.env.begin(db=db, write=True, buffers=True) as txn: try: return txn.delete(iokey) except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{iokey}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
# For subdbs that support duplicates at each key (dupsort==True)
[docs] def putVals(self, db, key, vals): """ Write each entry from list of bytes vals to key in db Adds to existing values at key if any Returns True If only one first written val in vals Else False Apparently always returns True (is this how .put works with dupsort=True) Duplicates are inserted in lexocographic order not insertion order. Lmdb does not insert a duplicate unless it is a unique value for that key. Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace vals is list of bytes of values to be written """ with self.env.begin(db=db, write=True, buffers=True) as txn: result = True try: for val in vals: result = result and txn.put(key, val, dupdata=True) except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError") return result
[docs] def addVal(self, db, key, val): """ Add val bytes as dup to key in db Adds to existing values at key if any Returns True if written else False if dup val already exists Duplicates are inserted in lexocographic order not insertion order. Lmdb does not insert a duplicate unless it is a unique value for that key. Does inclusion test to dectect of duplicate already exists Uses a python set for the duplicate inclusion test. Set inclusion scales with O(1) whereas list inclusion scales with O(n). Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace val is bytes of value to be written """ dups = set(self.getVals(db, key)) #get preexisting dups if any result = False if val not in dups: with self.env.begin(db=db, write=True, buffers=True) as txn: try: result = txn.put(key, val, dupdata=True) except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError") return result
[docs] def getVals(self, db, key): """ Return list of values at key in db Returns empty list if no entry at key Duplicates are retrieved in lexocographic order not insertion order. Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() vals = [] try: if cursor.set_key(key): # moves to first_dup vals = [val for val in cursor.iternext_dup()] except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError") return vals
[docs] def getValLast(self, db, key): """ Return last dup value at key in db in lexicographic order Returns None no entry at key Assumes DB opened with dupsort=True Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() val = None try: if cursor.set_key(key): # move to first_dup if cursor.last_dup(): # move to last_dup val = cursor.value() except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError") return val
[docs] def getValsIter(self, db, key): """ Return iterator of all dup values at key in db Raises StopIteration error when done or if empty Duplicates are retrieved in lexocographic order not insertion order. Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() vals = [] try: if cursor.set_key(key): # moves to first_dup for val in cursor.iternext_dup(): yield val except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
[docs] def cntVals(self, db, key): """ Return count of dup values at key in db, or zero otherwise Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() count = 0 try: if cursor.set_key(key): # moves to first_dup count = cursor.count() except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError") return count
[docs] def cntValsAllPre(self, db, pre, on=0): """ Returns (int): count of of all vals with same pre in key but different on in key in db starting at ordinal number on of pre Does not count dups Parameters: db is opened named sub db pre is bytes of key within sub db's keyspace pre.on """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() key = onKey(pre, on) # start replay at this enty 0 is earliest count = 0 if not cursor.set_range(key): # moves to val at key >= key return count # no values end of db for val in cursor.iternext(values=False): # get key, val at cursor cpre, cn = splitKeyON(val) if cpre != pre: # prev is now the last event for pre break # done count = count+1 return count
[docs] def delVals(self, db, key, val=b''): """ Deletes all values at key in db if val=b'' else deletes the dup that equals val Returns True If key (and val if not empty) exists in db Else False Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace val is bytes of dup val at key to delete """ with self.env.begin(db=db, write=True, buffers=True) as txn: try: return (txn.delete(key, val)) except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
# For subdbs that support insertion order preserving duplicates at each key. # dupsort==True and prepends and strips io val proem
[docs] def putIoVals(self, db, key, vals): """ Write each entry from list of bytes vals to key in db in insertion order Adds to existing values at key if any Returns True If at least one of vals is added as dup, False otherwise Assumes DB opened with dupsort=True Duplicates at a given key preserve insertion order of duplicate. Because lmdb is lexocographic an insertion ordering proem is prepended to all values that makes lexocographic order that same as insertion order Duplicates are ordered as a pair of key plus value so prepending proem to each value changes duplicate ordering. Proem is 33 characters long. With 32 character hex string followed by '.' for essentiall unlimited number of values which will be limited by memory. With prepended proem ordinal must explicity check for duplicate values before insertion. Uses a python set for the duplicate inclusion test. Set inclusion scales with O(1) whereas list inclusion scales with O(n). Parameters: db is opened named sub db with dupsort=False key is bytes of key within sub db's keyspace vals is list of bytes of values to be written """ result = False dups = set(self.getIoVals(db, key)) #get preexisting dups if any with self.env.begin(db=db, write=True, buffers=True) as txn: idx = 0 cursor = txn.cursor() try: if cursor.set_key(key): # move to key if any if cursor.last_dup(): # move to last dup idx = 1 + int(bytes(cursor.value()[:32]), 16) # get last index as int except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError") for val in vals: if val not in dups: val = (b'%032x.' % (idx)) + val # prepend ordering proem txn.put(key, val, dupdata=True) idx += 1 result = True return result
[docs] def addIoVal(self, db, key, val): """ Add val bytes as dup in insertion order to key in db Adds to existing values at key if any Returns True if written else False if val is already a dup Actual value written include prepended proem ordinal Assumes DB opened with dupsort=True Because lmdb is lexocographic an insertion ordering proem is prepended to all values that makes lexocographic order that same as insertion order Duplicates are ordered as a pair of key plus value so prepending prefix to each value changes duplicate ordering. Proem is 17 characters long. With 16 character hex string followed by '.'. Parameters: db is opened named sub db with dupsort=False key is bytes of key within sub db's keyspace val is bytes of value to be written """ return self.putIoVals(db, key, [val])
[docs] def getIoVals(self, db, key): """ Return list of duplicate values at key in db in insertion order Returns empty list if no entry at key Removes prepended proem ordinal from each val before returning Assumes DB opened with dupsort=True Because lmdb is lexocographic an insertion ordering proem is prepended to all values that makes lexocographic order that same as insertion order Duplicates are ordered as a pair of key plus value so prepending prefix to each value changes duplicate ordering. Proem is 17 characters long. With 16 character hex string followed by '.'. Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() vals = [] try: if cursor.set_key(key): # moves to first_dup # slice off prepended ordering proem vals = [val[33:] for val in cursor.iternext_dup()] return vals except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
[docs] def getIoValsIter(self, db, key): """ Return iterator of all duplicate values at key in db in insertion order Raises StopIteration Error when no remaining dup items = empty. Removes prepended proem ordinal from each val before returning Assumes DB opened with dupsort=True Because lmdb is lexocographic an insertion ordering proem is prepended to all values that makes lexocographic order that same as insertion order Duplicates are ordered as a pair of key plus value so prepending prefix to each value changes duplicate ordering. Proem is 17 characters long. With 16 character hex string followed by '.'. Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() vals = [] try: if cursor.set_key(key): # moves to first_dup for val in cursor.iternext_dup(): yield val[33:] # slice off prepended ordering proem except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
[docs] def getIoValLast(self, db, key): """ Return last added dup value at key in db in insertion order Returns None no entry at key Removes prepended proem ordinal from val before returning Assumes DB opened with dupsort=True Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() val = None try: if cursor.set_key(key): # move to first_dup if cursor.last_dup(): # move to last_dup val = cursor.value()[33:] # slice off prepended ordering proem return val except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
[docs] def getIoItemsNext(self, db, key=b"", skip=True): """ Return list of all dup items at next key after key in db in insertion order. Item is (key, val) with proem stripped from val stored in db. If key == b'' then returns list of dup items at first key in db. If skip is False and key is not empty then returns dup items at key Returns empty list if no entries at next key after key If key is empty then gets io items (key, io value) at first key in db Use the return key from items as next key for next call to function in order to iterate through the database Assumes DB opened with dupsort=True Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace or empty string skip is Boolean If True skips to next key if key is not empty string Othewise don't skip for first pass """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() items = [] if cursor.set_range(key): # moves to first_dup at key found = True if skip and key and cursor.key() == key: # skip to next key found = cursor.next_nodup() # skip to next key not dup if any if found: # slice off prepended ordering prefix on value in item items = [(key, val[33:]) for key, val in cursor.iternext_dup(keys=True)] return items
[docs] def getIoItemsNextIter(self, db, key=b"", skip=True): """ Return iterator of all dup items at next key after key in db in insertion order. Item is (key, val) with proem stripped from val stored in db. If key = b'' then returns list of dup items at first key in db. If skip is False and key is not empty then returns dup items at key Raises StopIteration Error when no remaining dup items = empty. If key is empty then gets io items (key, io value) at first key in db Use the return key from items as next key for next call to function in order to iterate through the database Assumes DB opened with dupsort=True Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace or empty skip is Boolean If True skips to next key if key is not empty string Othewise don't skip for first pass """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() if cursor.set_range(key): # moves to first_dup at key found = True if skip and key and cursor.key() == key: # skip to next key found = cursor.next_nodup() # skip to next key not dup if any if found: for key, val in cursor.iternext_dup(keys=True): yield (key, val[33:]) # slice off prepended ordering prefix
[docs] def cntIoVals(self, db, key): """ Return count of dup values at key in db, or zero otherwise Assumes DB opened with dupsort=True Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() count = 0 try: if cursor.set_key(key): # moves to first_dup count = cursor.count() except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError") return count
[docs] def delIoVals(self, db, key): """ Deletes all values at key in db if key present. Returns True If key exists Parameters: db is opened named sub db with dupsort=True key is bytes of key within sub db's keyspace """ with self.env.begin(db=db, write=True, buffers=True) as txn: try: return (txn.delete(key)) except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError")
[docs] def delIoVal(self, db, key, val): """ Deletes dup io val at key in db. Performs strip search to find match. Strips proems and then searches. Returns True if delete else False if val not present Assumes DB opened with dupsort=True Duplicates at a given key preserve insertion order of duplicate. Because lmdb is lexocographic an insertion ordering proem is prepended to all values that makes lexocographic order that same as insertion order Duplicates are ordered as a pair of key plus value so prepending proem to each value changes duplicate ordering. Proem is 33 characters long. With 32 character hex string followed by '.' for essentially unlimited number of values which will be limited by memory. Does a linear search so not very efficient when not deleting from the front. This is hack for supporting escrow which needs to delete individual dup. The problem is that escrow is not fixed buts stuffs gets added and deleted which just adds to the value of the proem. 2**16 is an impossibly large number so the proem will not max out practically. But its not and elegant solution. So maybe escrows need to use a different approach. But really didn't want to add another database just for escrows. Parameters: db is opened named sub db with dupsort=False key is bytes of key within sub db's keyspace val is bytes of value to be deleted without intersion ordering proem """ with self.env.begin(db=db, write=True, buffers=True) as txn: cursor = txn.cursor() try: if cursor.set_key(key): # move to first_dup for proval in cursor.iternext_dup(): # value with proem if val == proval[33:]: # strip of proem return cursor.delete() except lmdb.BadValsizeError as ex: raise KeyError(f"Key: `{key}` is either empty, too big (for lmdb)," " or wrong DUPFIXED size. ref) lmdb.BadValsizeError") return False
[docs] def getIoValsAllPreIter(self, db, pre, on=0): """ Returns iterator of all dup vals in insertion order for all entries with same prefix across all ordinal numbers in increasing order without gaps between ordinal numbers starting with on, default 0. Stops if gap or different pre. Assumes that key is combination of prefix and sequence number given by .snKey(). Removes prepended proem ordinal from each val before returning Raises StopIteration Error when empty. Duplicates are retrieved in insertion order. Because lmdb is lexocographic an insertion ordering proem is prepended to all values that makes lexocographic order that same as insertion order Duplicates are ordered as a pair of key plus value so prepending prefix to each value changes duplicate ordering. Proem is 17 characters long. With 16 character hex string followed by '.'. Parameters: db is opened named sub db with dupsort=True pre (bytes | str): of itdentifier prefix prepended to sn in key within sub db's keyspace on (int): ordinal number to begin iteration at """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() key = snKey(pre, cnt:=on) while cursor.set_key(key): # moves to first_dup for val in cursor.iternext_dup(): # slice off prepended ordering prefix yield val[33:] key = snKey(pre, cnt:=cnt+1)
[docs] def getIoValsAllPreBackIter(self, db, pre, on=0): """ Returns iterator of all dup vals in insertion order for all entries with same prefix across all sequence numbers in decreasing order without gaps between ordinals at a given pre. Starting with on (default = 0) as begining ordinal number or sequence number. Stops if gap or different pre. Assumes that key is combination of prefix and sequence number given by .snKey(). Removes prepended proem ordinal from each val before returning Raises StopIteration Error when empty. Duplicates are retrieved in insertion order. Because lmdb is lexocographic an insertion ordering proem is prepended to all values that makes lexocographic order that same as insertion order Duplicates are ordered as a pair of key plus value so prepending prefix to each value changes duplicate ordering. Proem is 17 characters long. With 16 character hex string followed by '.'. Parameters: db is opened named sub db with dupsort=True pre is bytes of identifier prefix prepended to sn in key within sub db's keyspace on (int): is ordinal number to begin iteration """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() key = snKey(pre, cnt := on) # set_key returns True if exact key else false while cursor.set_key(key): # moves to first_dup if valid key for val in cursor.iternext_dup(): # slice off prepended ordering prefix yield val[33:] key = snKey(pre, cnt:=cnt-1)
[docs] def getIoValLastAllPreIter(self, db, pre, on=0): """ Returns iterator of last only of dup vals of each key in insertion order for all entries with same prefix across all sequence numbers in increasing order without gaps starting with on (default = 0). Stops if gap or different pre. Assumes that key is combination of prefix and sequence number given by .snKey(). Removes prepended proem ordinal from each val before returning Raises StopIteration Error when empty. Duplicates are retrieved in insertion order. Because lmdb is lexocographic an insertion ordering proem is prepended to all values that makes lexocographic order that same as insertion order Duplicates are ordered as a pair of key plus value so prepending prefix to each value changes duplicate ordering. Proem is 17 characters long. With 16 character hex string followed by '.'. Parameters: db is opened named sub db with dupsort=True pre is bytes of itdentifier prefix prepended to sn in key within sub db's keyspace on (int): ordinal number to being iteration """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() key = snKey(pre, cnt:=on) while cursor.set_key(key): # moves to first_dup if cursor.last_dup(): # move to last_dup yield cursor.value()[33:] # slice off prepended ordering prefix key = snKey(pre, cnt:=cnt+1)
[docs] def getIoValsAnyPreIter(self, db, pre, on=0): """ Returns iterator of all dup vals in insertion order for any entries with same prefix across all ordinal numbers in order including gaps between ordinals at a given pre. Staring with on (default = 0). Stops when pre is different. Duplicates that may be deleted such as duplicitous event logs need to be able to iterate across gaps in ordinal number. Assumes that key is combination of prefix and sequence number given by .snKey(). Removes prepended proem ordinal from each val before returning Raises StopIteration Error when empty. Duplicates are retrieved in insertion order. Because lmdb is lexocographic an insertion ordering proem is prepended to all values that makes lexocographic order that same as insertion order Duplicates are ordered as a pair of key plus value so prepending prefix to each value changes duplicate ordering. Proem is 17 characters long. With 16 character hex string followed by '.'. Parameters: db is opened named sub db with dupsort=True pre is bytes of itdentifier prefix prepended to sn in key within sub db's keyspace on (int): beginning ordinal number to start iteration """ with self.env.begin(db=db, write=False, buffers=True) as txn: cursor = txn.cursor() key = snKey(pre, cnt:=on) while cursor.set_range(key): # moves to first dup of key >= key key = cursor.key() # actual key front, back = bytes(key).split(sep=b'.', maxsplit=1) if front != pre: # set range may skip pre if none break for val in cursor.iternext_dup(): yield val[33:] # slice off prepended ordering prefix cnt = int(back, 16) key = snKey(pre, cnt:=cnt+1)