Source code for keri.core.annotating

# -*- coding: utf-8 -*-
"""
keri.core.annotating module

Provides support for Annotater
"""
from ..kering import (sniff, Colds, Ilks, IlkError,
                      ExtractionError, ColdStartError)

from .coring import (Verser, Ilker, Diger, Prefixer, Number, Tholder,
                     Verfer, Traitor)
from .counting import Counter
from .structing import Sealer
from .serdering import Serder


[docs] def annot(ims): """Annotate CESR stream Returns: annotation (str): annotation of input CESR stream Parameters: ims (str | bytes | bytearray | memoryview): CESR incoming message stream as qb64 (maybe qb2) """ oms = bytearray() indent = 0 if not isinstance(ims, bytearray): # going to strip ims = bytearray(ims) # so make bytearray copy, converts str to bytearray while ims: # right now just for KERI event messages cold = sniff(ims) # check for spurious counters at front of stream if cold in Colds.txt: val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # Key Event Counter " f"{val.name} count={val.count} quadlets\n".encode()) indent += 1 # should grab fill message frame here so can verify its consumed # when done annotating # version val = Verser(qb64b=ims, strip=True) versage = val.versage oms.extend(f"{' ' * indent * 2}{val.qb64} # 'v' version Verser {val.name} " f"proto={versage.proto} vrsn={versage.pvrsn.major}." f"{versage.pvrsn.minor:02}\n".encode()) # ilk ilker = Ilker(qb64b=ims, strip=True) ilk = ilker.ilk oms.extend(f"{' ' * indent * 2}{ilker.qb64} # 't' message type Ilker " f"{ilker.name} Ilk={ilker.ilk}\n".encode()) if ilk in (Ilks.icp, Ilks.ixn, Ilks.rot, Ilks.dip, Ilks.drt): # said val = Diger(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'd' SAID Diger " f"{val.name} \n".encode()) # aid pre val = Prefixer(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'i' AID Prefixer " f"{val.name} \n".encode()) # sn val = Number(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 's' Number " f"{val.name} sn={val.sn}\n".encode()) if ilk in (Ilks.icp, Ilks.dip): # inception # v='', t='', d='', i='', s='0', kt='0',k=[], nt='0', n=[], bt='0', b=[], c=[], a=[] # Signing key threshold val = Tholder(limen=ims, strip=True) # add qb64 and qb2 to tholder as aliases for limen and limen.decode() oms.extend(f"{' ' * indent * 2}{val.limen.decode()} # 'kt' " f"Tholder signing threshold={val.sith}\n".encode()) # Signing key list val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'k' Signing Key " f"List Counter {val.name} count={val.count} quadlets\n".encode()) indent += 1 frame = ims[:val.count*4] # extract frame from ims del ims[:val.count*4] # strip frame while frame: val = Verfer(qb64b=frame, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # key Verfer " f"{val.name}\n".encode()) indent -= 1 # Rotation key threshold val = Tholder(limen=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.limen.decode()} # 'nt' " f"Tholder rotation threshold={val.sith}\n".encode()) # Next key digest list val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'n' Rotation Key " f"Digest List Counter {val.name} count={val.count} " f"quadlets\n".encode()) indent += 1 frame = ims[:val.count*4] # extract frame from ims del ims[:val.count*4] # strip frame while frame: val = Diger(qb64b=frame, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # key digest Diger" f" {val.name}\n".encode()) indent -= 1 # Witness Backer threshold val = Tholder(limen=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.limen.decode()} # 'bt' " f"Tholder Backer (witness) threshold={val.sith}\n".encode()) # Witness Backer list val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'b' Backer (witness)" f"List Counter {val.name} count={val.count} quadlets\n".encode()) indent += 1 frame = ims[:val.count*4] # extract frame from ims del ims[:val.count*4] # strip frame while frame: val = Prefixer(qb64b=frame, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # AID Prefixer " f"{val.name}\n".encode()) indent -= 1 # Config Trait List val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'c' Config Trait " f"List Counter {val.name} count={val.count} quadlets\n".encode()) indent += 1 frame = ims[:val.count*4] # extract frame from ims del ims[:val.count*4] # strip frame while frame: val = Traitor(qb64b=frame, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # trait Traitor " f"{val.name} trait={val.trait}\n".encode()) indent -= 1 elif ilk == Ilks.ixn: # Interaction # v='', t='',d='', i='', s='0', p='', a=[] # Prior said val = Diger(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'p' prior SAID Diger " f"{val.name} \n".encode()) elif ilk in (Ilks.rot, Ilks.drt): # Rotation # v='', t='',d='', i='', s='0', p='', kt='0',k=[], nt='0', n=[], bt='0', br=[], ba=[], c=[], a=[] # Prior said val = Diger(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'p' prior SAID Diger " f"{val.name} \n".encode()) # Signing key threshold val = Tholder(limen=ims, strip=True) # add qb64 and qb2 to tholder as aliases for limen and limen.decode() oms.extend(f"{' ' * indent * 2}{val.limen.decode()} # 'kt' " f"Tholder signing threshold={val.sith}\n".encode()) # Signing key list val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'k' Signing Key " f"List Counter {val.name} count={val.count} quadlets\n".encode()) indent += 1 frame = ims[:val.count*4] # extract frame from ims del ims[:val.count*4] # strip frame while frame: val = Verfer(qb64b=frame, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # key Verfer " f"{val.name}\n".encode()) indent -= 1 # Rotation key threshold val = Tholder(limen=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.limen.decode()} # 'nt' " f"Tholder rotation threshold={val.sith}\n".encode()) # Next key digest list val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'n' Rotation Key " f"Digest List Counter {val.name} count={val.count} " f"quadlets\n".encode()) indent += 1 frame = ims[:val.count*4] # extract frame from ims del ims[:val.count*4] # strip frame while frame: val = Diger(qb64b=frame, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # key digest Diger" f" {val.name}\n".encode()) indent -= 1 # Witness Backer threshold val = Tholder(limen=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.limen.decode()} # 'bt' " f"Tholder Backer (witness) threshold={val.sith}\n".encode()) # Witness Backer Cut list val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'b' Backer (witness)" f"Cut List Counter {val.name} count={val.count} quadlets\n".encode()) indent += 1 frame = ims[:val.count*4] # extract frame from ims del ims[:val.count*4] # strip frame while frame: val = Prefixer(qb64b=frame, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # AID Prefixer " f"{val.name}\n".encode()) indent -= 1 # Witness Backer Add list val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'b' Backer (witness)" f"Add List Counter {val.name} count={val.count} quadlets\n".encode()) indent += 1 frame = ims[:val.count*4] # extract frame from ims del ims[:val.count*4] # strip frame while frame: val = Prefixer(qb64b=frame, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # AID Prefixer " f"{val.name}\n".encode()) indent -= 1 # Config Trait List val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'c' Config Trait " f"List Counter {val.name} count={val.count} quadlets\n".encode()) indent += 1 frame = ims[:val.count*4] # extract frame from ims del ims[:val.count*4] # strip frame while frame: val = Traitor(qb64b=frame, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # trait Traitor " f"{val.name} trait={val.trait}\n".encode()) indent -= 1 else: raise IlkError(f"Unexpected message type={ilk}") if ilk in (Ilks.icp, Ilks.ixn, Ilks.rot, Ilks.dip, Ilks.drt): # Seal (anchor) List val = Counter(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'a' Seal List Counter" f" {val.name} count={val.count} quadlets\n".encode()) indent += 1 frame = ims[:val.count*4] # extract frame from ims del ims[:val.count*4] # strip frame while frame: val = Counter(qb64b=frame, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # Seal Counter " f"{val.name} count={val.count} quadlets\n".encode()) indent += 1 subframe = frame[:val.count*4] del frame[:val.count*4] # strip subframe clan = Sealer.Clans[Serder.CodeClans[val.code]] while subframe: val = Sealer(clan=clan, qb64=subframe, strip=True) # need to add qb64 parameter to structor oms.extend(f"{' ' * indent * 2}{val.qb64}# seal Sealer {val.name}\n".encode()) indent += 1 for i, t in enumerate(val.crew._asdict().items()): oms.extend(f"{' ' * indent * 2}# '{t[0]}' = " f"{t[1]}\n".encode()) indent -= 1 indent -= 1 indent -= 1 if ilk == Ilks.dip: # delegator aid delpre val = Prefixer(qb64b=ims, strip=True) oms.extend(f"{' ' * indent * 2}{val.qb64} # 'di' Delegator AID Prefixer " f"{val.name} \n".encode()) if ims: raise ExtractionError(f"Unexpected remaining bytes in stream.") elif cold in Colds.bny: pass else: raise ColdStartError("Expecting stream tritet={}" "".format(cold)) return oms.decode() # return unicode string
[docs] def denot(ams): """De-annotate CESR stream Returns: dms (bytes): deannotation of input annotated CESR message stream Parameters: ams (str): CESR annotated message stream text """ dms = bytearray() # deannotated message stream lines = ams.splitlines() for line in lines: line = line.strip() front, sep, back = line.partition('#') # finde comment if any front = front.strip() # non-commented portion strip white space if front: dms.extend(front.encode()) return bytes(dms)