Source code for keri.core.parsing

# -*- encoding: utf-8 -*-
"""
keri.core.parsing module

message stream parsing support
"""
import copy
import logging
from dataclasses import asdict
from collections import deque
from base64 import urlsafe_b64encode as encodeB64

from hio.help import ogler

from ..kering import (Colds, sniff, Vrsn_2_0, Version, Ilks,
                      UnexpectedCountCodeError, ValidationError,
                      QueryNotFoundError, ExtractionError, ShortageError,
                      ColdStartError, InvalidVersionError,
                      SizedGroupError, TopLevelStreamError)

from .coring import (Seqner, Cigar, Diger, Noncer, Labeler, Number, Verser,
                     Dater, Verfer, Prefixer, Saider, Texter)
from .counting import Counter, Codens, CtrDex_1_0, CtrDex_2_0, GenDex
from .indexing import Siger
from .serdering import Serdery, SerderKERI, SerderACDC


logger = ogler.getLogger()


[docs] class Parser: """Parser is stream parser that processes an incoming message stream. Each message in the stream is composed of a message body with a message foot The message body includes a version string. The message foot is composed of composable concatenated attachments encoded in CESR (Composable Event Streaming Representation) CESR supports both binary and text formats where text is Base64 URL/Filesafe. The attachements in a CESR foot may be converted and round tripped en-masse between binary and text (Base64 URL/File). CESR encoding ensures alignment on 24 bit boundaries. Only supports current version VERSION Has the following public attributes and properties: Attributes: ims (bytearray): incoming message stream framed (bool): True means stream is packet framed piped (bool): True means use pipeline processor to process whenever stream includes pipelineable group count codes. kvy (Kevery): route KEL message types to this instance tvy (Tevery): route TEL message types to this instance exc (Exchanger): route EXN message types to this instance rvy (Revery): reply (RPY) message handler vry (``Verfifier``): credential verifier with wallet storage local (bool): True means event source is local (protected) for validation False means event source is remote (unprotected) for validation Properties: genus (str): genus portion of default CESR code table protocol genus code version (Versionage): current CESR protocol genus version in context methods (dict): method names for counter extraction, keyed by count code name codes (CtrDex): selected by .version from (CtrDex_1_0, CtrDex_2_0) sucodes (SUDex): selected by .version from (SUDex_1_0, SUDex_2_0) mucodes (MUDex): selected by .version from (MUDex_1_0, MUDex_2_0) Hidden: _version (Versionage): value for .version property _genus (str): value for .genus property _methods (dict): value for .methods property _codes (CtrDex): value for .codes property _sucodes (SUDex): value for .sucodes property _mucodes (MUDex): value for .mucodes property """ Codes = Counter.Codes # code tables from Counter SUCodes = Counter.SUCodes # special universal code tables from Counter MUCodes = Counter.MUCodes # message universal code tables from Counter Methods = copy.deepcopy(Counter.Codes) # make deep copy so not mutate Counter for minor in Methods.values(): # assign None as default val for all possible code names for key in minor: minor[key] = {key: None for key in asdict(minor[key])} # reassign method name val for those code names that have a supporting method Methods[1][0][Codens.ControllerIdxSigs] = "_ControllerIdxSigs1" Methods[2][0][Codens.ControllerIdxSigs] = "_ControllerIdxSigs2" Methods[2][0][Codens.BigControllerIdxSigs] = "_ControllerIdxSigs2" Methods[1][0][Codens.WitnessIdxSigs] = "_WitnessIdxSigs1" Methods[2][0][Codens.WitnessIdxSigs] = "_WitnessIdxSigs2" Methods[2][0][Codens.BigWitnessIdxSigs] = "_WitnessIdxSigs2" Methods[1][0][Codens.NonTransReceiptCouples] = "_NonTransReceiptCouples1" Methods[2][0][Codens.NonTransReceiptCouples] = "_NonTransReceiptCouples2" Methods[2][0][Codens.BigNonTransReceiptCouples] = "_NonTransReceiptCouples2" Methods[1][0][Codens.TransReceiptQuadruples] = "_TransReceiptQuadruples1" Methods[2][0][Codens.TransReceiptQuadruples] = "_TransReceiptQuadruples2" Methods[2][0][Codens.BigTransReceiptQuadruples] = "_TransReceiptQuadruples2" Methods[1][0][Codens.TransIdxSigGroups] = "_TransIdxSigGroups1" Methods[2][0][Codens.TransIdxSigGroups] = "_TransIdxSigGroups2" Methods[2][0][Codens.BigTransIdxSigGroups] = "_TransIdxSigGroups2" Methods[1][0][Codens.TransLastIdxSigGroups] = "_TransLastIdxSigGroups1" Methods[2][0][Codens.TransLastIdxSigGroups] = "_TransLastIdxSigGroups2" Methods[2][0][Codens.BigTransLastIdxSigGroups] = "_TransLastIdxSigGroups2" Methods[1][0][Codens.FirstSeenReplayCouples] = "_FirstSeenReplayCouples1" Methods[2][0][Codens.FirstSeenReplayCouples] = "_FirstSeenReplayCouples2" Methods[2][0][Codens.BigFirstSeenReplayCouples] = "_FirstSeenReplayCouples2" Methods[1][0][Codens.PathedMaterialCouples] = "_PathedMaterialCouples" Methods[1][0][Codens.BigPathedMaterialCouples] = "_PathedMaterialCouples" Methods[2][0][Codens.PathedMaterialCouples] = "_PathedMaterialCouples" Methods[2][0][Codens.BigPathedMaterialCouples] = "_PathedMaterialCouples" Methods[1][0][Codens.SealSourceTriples] = "_SealSourceTriples1" Methods[2][0][Codens.SealSourceTriples] = "_SealSourceTriples2" Methods[2][0][Codens.BigSealSourceTriples] = "_SealSourceTriples2" Methods[1][0][Codens.SealSourceCouples] = "_SealSourceCouples1" Methods[2][0][Codens.SealSourceCouples] = "_SealSourceCouples2" Methods[2][0][Codens.BigSealSourceCouples] = "_SealSourceCouples2" Methods[2][0][Codens.TypedDigestSealCouples] = "_TypedDigestSealCouples" Methods[2][0][Codens.BigTypedDigestSealCouples] = "_TypedDigestSealCouples" Methods[1][0][Codens.ESSRPayloadGroup] = "_ESSRPayloadGroup1" Methods[1][0][Codens.BigESSRPayloadGroup] = "_ESSRPayloadGroup1" Methods[2][0][Codens.ESSRPayloadGroup] = "_ESSRPayloadGroup2" Methods[2][0][Codens.BigESSRPayloadGroup] = "_ESSRPayloadGroup2" Methods[2][0][Codens.BlindedStateQuadruples] = "_BlindedStateQuadruples" Methods[2][0][Codens.BigBlindedStateQuadruples] = "_BlindedStateQuadruples" Methods[2][0][Codens.BoundStateSextuples] = "_BoundStateSextuples" Methods[2][0][Codens.BigBoundStateSextuples] = "_BoundStateSextuples" Methods[2][0][Codens.TypedMediaQuadruples] = "_TypedMediaQuadruples" Methods[2][0][Codens.BigTypedMediaQuadruples] = "_TypedMediaQuadruples"
[docs] def __init__(self, ims=None, framed=True, piped=False, kvy=None, tvy=None, exc=None, rvy=None, vry=None, local=False, version=Vrsn_2_0): """ Initialize instance: Parameters: ims (bytearray): incoming message stream framed (bool): True means ims contains only one msg body plus its foot of attachments, not multiple sets of msg body plus foot piped (bool): True means use pipeline processor to process ims msgs when stream includes pipelined count codes. kvy (Kevery): route KEL message types to this instance tvy (Tevery): route TEL message types to this instance exc (Exchanger): route EXN message types to this instance rvy (Revery): reply (RPY) message handler vry (``Verfifier``): credential verifier with wallet storage local (bool): True means event source is local (protected) for validation False means event source is remote (unprotected) for validation version (Versionage): instance of version portion of genus version code for default code table """ self.ims = ims if ims is not None else bytearray() self.framed = True if framed else False # extract until end-of-stream self.piped = True if piped else False # use pipeline processor self.kvy = kvy self.tvy = tvy self.exc = exc self.rvy = rvy self.vry = vry self.local = True if local else False self._genus = GenDex.KERI # only supports KERI self.version = version # provided version may be earlier than supported version
# version sets .methods, .codes, .sucodes, and .mucodes @property def genus(self): """Makes .genus read only Returns ._genus """ return self._genus @property def version(self): """Makes .version read only default version from genus-version code Returns ._version """ return self._version @version.setter def version(self, version): """Property setter for .version Parameters: version (Versionage|None): version portion of genus-versioncode If None do nothing """ if version is not None: if version.major not in self.Methods: raise InvalidVersionError(f"Unsupported major version=" f"{version.major}.") latest = list(self.Methods[version.major])[-1] # get latest supported minor version if version.minor > latest: raise InvalidVersionError(f"Minor version={version.minor} " f" exceeds latest supported minor" f" version={latest}.") self._version = version self._methods = self.Methods[version.major][latest] self._codes = self.Codes[version.major][latest] self._sucodes = self.SUCodes[version.major][latest] self._mucodes = self.MUCodes[version.major][latest] @property def methods(self): """Gets methods from .Methods for .version current version in stream context Returns: methods (dict): method names for counter extraction, keyed by count code name """ return self._methods @property def codes(self): """Makes .codes read only Returns: _codes (CtrDex): selected by .version from (CtrDex_1_0, CtrDex_2_0) """ return self._codes @property def sucodes(self): """Makes .sucodes read only Returns: _sucodes (SUDex): selected by .version from (SUDex_1_0, SUDex_2_0) """ return self._sucodes @property def mucodes(self): """Makes .mucodes read only Returns: _mucodes (MUDex): selected by .version from (MUDex_1_0, MUDex_2_0) """ return self._mucodes
[docs] def extract(self, ims, klas, cold=Colds.txt): """Extract and return instance of klas from input message stream, ims, given stream state, cold, is txt or bny. Inits klas from ims using qb64b or qb2 parameter based on cold. Parameters: ims (bytearray): input message stream (must be strippable) klas (Serder | Counter | Matter | Indexer): subclass that is parsable cold (Coldage): instance str value """ if cold == Colds.txt: return klas(qb64b=ims, strip=True, version=self.version) elif cold == Colds.bny: return klas(qb2=ims, strip=True, version=self.version) else: raise ColdStartError(f"Invalid stream state {cold=}")
def _extractor(self, ims, klas, cold=Colds.txt, abort=False, strip=True): """Returns generator to extract and return instance of klas from input message stream, ims, given stream state, cold, is txt or bny. If wait is True then yield when not enough bytes in stream otherwise raise ShortageError Inits klas from ims using qb64b or qb2 parameter based on cold. Yields if not enough bytes in ims to fill out klas instance. Parameters: ims (bytearray): input message stream (must be strippable) klas (Serder | Counter | Matter | Indexer): subclass that is parsable cold (Coldage): instance str value abort (bool): True means abort if bad pipelined frame Shortage False means do not abort if Shortage just wait for more strip (bool): True means strip extracted instance from ims False means do not strip, so can peek at stream Usage: yield from self._extractor(ims=ims, klas=Counter) """ while True: try: if cold == Colds.txt: return klas(qb64b=ims, strip=strip, version=self.version) elif cold == Colds.bny: return klas(qb2=ims, strip=strip, version=self.version) else: raise ColdStartError(f"Invalid stream state {cold=}") except ShortageError as ex: if abort: # pipelined pre-collects full frame before extracting raise # bad pipelined frame so abort by raising error yield
[docs] def parse(self, ims=None, framed=None, piped=None, kvy=None, tvy=None, exc=None, rvy=None, vry=None, local=None, version=None): """Processes all messages from incoming message stream, ims, when provided. Otherwise process messages from .ims Returns when ims is empty. Convenience executor for .allParsatator when ims is not live, i.e. fixed Parameters: ims (bytearray): incoming message stream. May contain one or more sets each of a serialized message with attached cryptographic material such as signatures or receipts. framed (bool): True means ims contains only one frame of msg plus counted attachments instead of stream with multiple messages piped (bool): True means use pipeline processor to process ims msgs when stream incpyludes pipelineable count codes. kvy (Kevery): route KERI KEL message types to this instance tvy (Tevery): route TEL message types to this instance exc (Exchanger) route EXN message types to this instance rvy (Revery): reply (RPY) message handler vry (Verfifier): credential verifier with wallet storage local (bool): True means event source is local (protected) for validation False means event source is remote (unprotected) for validation None means use default .local version (Versionage): default version of CESR to use None means do not change default New Logic: Attachments must all have counters so know if txt or bny format for attachments. So even when framed==True must still have counters. """ local = local if local is not None else self.local local = True if local else False parsator = self.allParsator(ims=ims, framed=framed, piped=piped, kvy=kvy, tvy=tvy, exc=exc, rvy=rvy, vry=vry, local=local, version=version) while True: try: next(parsator) except StopIteration: break
[docs] def parseOne(self, ims=None, framed=True, piped=False, kvy=None, tvy=None, exc=None, rvy=None, vry=None, local=None, version=None): """Processes one messages from incoming message stream, ims, when provided. Otherwise process message from .ims Returns once one message is processed. Convenience executor for .processOneGen when ims is not live, i.e. fixed Parameters: ims (bytearray): serialized incoming message stream. May contain one or more sets each of a serialized message with attached cryptographic material such as signatures or receipts. framed (bool) True means ims contains only one frame of msg plus counted attachments instead of stream with multiple messages piped (bool): True means use pipeline processor to process ims msgs when stream includes pipelineable count codes. kvy (Kevery): route KERI KEL message types to this instance tvy (Tevery): route TEL message types to this instance exc (Exchanger) route EXN message types to this instance rvy (Revery): reply (RPY) message handler local (bool): True means event source is local (protected) for validation False means event source is remote (unprotected) for validation None means use default .local version (Versionage): default genera version of CESR to use None means do not change default New Logic: Attachments must all have counters so know if txt or bny format for attachments. So even when framed==True must still have counters. """ local = local if local is not None else self.local local = True if local else False parsator = self.onceParsator(ims=ims, framed=framed, piped=piped, kvy=kvy, tvy=tvy, exc=exc, rvy=rvy, vry=vry, local=local, version=version) while True: try: next(parsator) except StopIteration: break
[docs] def allParsator(self, ims=None, framed=None, piped=None, kvy=None, tvy=None, exc=None, rvy=None, vry=None, local=None, version=None): """Returns generator to parse all messages from incoming message stream, ims until ims is exhausted (empty) then returns. Generator completes as soon as ims is empty. If ims not provided then parse messages from .ims Parameters: ims (bytearray): of incoming message stream. May contain one or more sets each of a serialized message with attached cryptographic material such as signatures or receipts. framed (bool): True means ims contains only one frame of msg plus counted attachments instead of stream with multiple messages piped (bool): True means use pipeline processor to process ims msgs when stream includes pipelineable count codes. kvy (Kevery): route KERI KEL message types to this instance tvy (Tevery): route TEL message types to this instance exc (Exchanger) route EXN message types to this instance rvy (Revery): reply (RPY) message handler vry (Verfifier): credential verifier with wallet storage local (bool): True means event source is local (protected) for validation False means event source is remote (unprotected) for validation None means use default .local version (Versionage): default version of CESR to use None means do not change default New Logic: Attachments must all have counters so know if txt or bny format for attachments. So even when framed==True must still have counters. """ if ims is not None: # needs bytearray not bytes since deletes as processes if not isinstance(ims, bytearray): ims = bytearray(ims) # so make bytearray copy else: ims = self.ims # use instance attribute by default framed = framed if framed is not None else self.framed piped = piped if piped is not None else self.piped kvy = kvy if kvy is not None else self.kvy tvy = tvy if tvy is not None else self.tvy exc = exc if exc is not None else self.exc rvy = rvy if rvy is not None else self.rvy vry = vry if vry is not None else self.vry local = local if local is not None else self.local local = True if local else False while ims: # only process until ims empty (differs here from parsator) try: done = yield from self.groupParsator(ims=ims, framed=framed, piped=piped, kvy=kvy, tvy=tvy, exc=exc, rvy=rvy, vry=vry, local=local, version=version) except SizedGroupError as ex: # error inside sized group # processOneIter already flushed group so do not flush stream if logger.isEnabledFor(logging.DEBUG): logger.exception("Parser sized group error: %s", ex.args[0]) else: logger.error("Parser sized group error: %s", ex.args[0]) except (ColdStartError, ExtractionError) as ex: # some extraction error if logger.isEnabledFor(logging.DEBUG): logger.exception("Parser msg extraction error: %s", ex.args[0]) else: logger.error("Parser msg extraction error: %s", ex.args[0]) del ims[:] # delete rest of stream to force cold restart except (ValidationError, Exception) as ex: # non Extraction Error # Non extraction errors happen after successfully extracted from stream # so we don't flush rest of stream just resume if logger.isEnabledFor(logging.TRACE): logger.exception("Parser msg non-extraction error: %s", ex) if logger.isEnabledFor(logging.DEBUG): logger.error("Parser msg non-extraction error: %s", ex) yield return True
[docs] def onceParsator(self, ims=None, framed=None, piped=None, kvy=None, tvy=None, exc=None, rvy=None, vry=None, local=None, version=None): """Returns generator to parse one message from incoming message stream, ims. If ims not provided parse messages from .ims Parameters: ims (bytearray): incoming message stream. May contain one or more sets each of a serialized message with attached cryptographic material such as signatures or receipts. framed (bool): True means ims contains only one frame of msg plus counted attachments instead of stream with multiple messages piped (bool): True means use pipeline processor to process ims msgs when stream includes pipelineable count codes. kvy (Kevery): route KERI KEL message types to this instance tvy (Tevery): route TEL message types to this instance exc (Exchanger) route EXN message types to this instance rvy (Revery): reply (RPY) message handler vry (Verfifier): credential verifier with wallet storage local (bool): True means event source is local (protected) for validation False means event source is remote (unprotected) for validation None means use default .local version (Versionage): default version of CESR to use None means do not change default New Logic: Attachments must all have counters so know if txt or bny format for attachments. So even when framed==True must still have counters. """ if ims is not None: # needs bytearray not bytes since deletes as processes if not isinstance(ims, bytearray): ims = bytearray(ims) # so make bytearray copy else: ims = self.ims # use instance attribute by default framed = framed if framed is not None else self.framed piped = piped if piped is not None else self.piped kvy = kvy if kvy is not None else self.kvy tvy = tvy if tvy is not None else self.tvy exc = exc if exc is not None else self.exc rvy = rvy if rvy is not None else self.rvy vry = vry if vry is not None else self.vry local = local if local is not None else self.local local = True if local else False done = False while not done: try: done = yield from self.msgParsator(ims=ims, framed=framed, piped=piped, kvy=kvy, tvy=tvy, exc=exc, rvy=rvy, vry=vry, local=local, version=version) except SizedGroupError as ex: # error inside sized group # processOneIter already flushed group so do not flush stream if logger.isEnabledFor(logging.DEBUG): logger.exception("Kevery sized group error: %s", ex.args[0]) else: logger.error("Kevery sized group error: %s", ex.args[0]) except (ColdStartError, ExtractionError) as ex: # some extraction error if logger.isEnabledFor(logging.DEBUG): logger.exception("Kevery msg extraction error: %s", ex.args[0]) else: logger.error("Kevery msg extraction error: %s", ex.args[0]) del ims[:] # delete rest of stream to force cold restart except (ValidationError, Exception) as ex: # non Extraction Error # Non extraction errors happen after successfully extracted from stream # so we don't flush rest of stream just resume if logger.isEnabledFor(logging.TRACE): logger.exception("Kevery msg non-extraction error: %s", ex) if logger.isEnabledFor(logging.DEBUG): logger.error("Kevery msg non-extraction error: %s", ex) finally: done = True return done
[docs] def parsator(self, ims=None, framed=None, piped=None, kvy=None, tvy=None, exc=None, rvy=None, vry=None, local=None, version=None): """Returns generator to continually parse messages from incoming message stream, ims. Empty yields when ims is emply. Does not return. Useful for always running servers. One yield from per each message if any. Continually yields while ims is empty, i.e. does not return. If ims not provided then parse messages from .ims Parameters: ims (bytearray): incoming message stream. May contain one or more sets each of a serialized message with attached cryptographic material such as signatures or receipts. framed (bool): True means ims contains only one frame of msg plus counted attachments instead of stream with multiple messages piped (bool): True means use pipeline processor to process ims msgs when stream includes pipelineable count codes. kvy (Kevery): route KERI KEL message types to this instance tvy (Tevery): route TEL message types to this instance exc (Exchanger) route EXN message types to this instance rvy (Revery): reply (RPY) message handler vry (Verifier): credential processor local (bool): True means event source is local (protected) for validation False means event source is remote (unprotected) for validation None means use default .local version (Versionage): default version of CESR to use None means do not change default New Logic: Attachments must all have counters so know if txt or bny format for attachments. So even when framed==True must still have counters. """ if ims is not None: # needs bytearray not bytes since deletes as processes if not isinstance(ims, bytearray): ims = bytearray(ims) # so make bytearray copy else: ims = self.ims # use instance attribute by default framed = framed if framed is not None else self.framed piped = piped if piped is not None else self.piped kvy = kvy if kvy is not None else self.kvy tvy = tvy if tvy is not None else self.tvy exc = exc if exc is not None else self.exc rvy = rvy if rvy is not None else self.rvy vry = vry if vry is not None else self.vry local = local if local is not None else self.local local = True if local else False while True: # continuous stream processing (differs here from allParsator) try: done = yield from self.groupParsator(ims=ims, framed=framed, piped=piped, kvy=kvy, tvy=tvy, exc=exc, rvy=rvy, vry=vry, local=local, version=version) #done = yield from self.msgParsator(ims=ims, #framed=framed, #piped=piped, #kvy=kvy, #tvy=tvy, #exc=exc, #rvy=rvy, #vry=vry, #local=local, #version=version) except SizedGroupError as ex: # error inside sized group # processOneIter already flushed group so do not flush stream if logger.isEnabledFor(logging.DEBUG): logger.exception("Parser sized group error: %s", ex.args[0]) else: logger.error("Parser sized group error: %s", ex.args[0]) except (ColdStartError, ExtractionError) as ex: # some extraction error if logger.isEnabledFor(logging.DEBUG): logger.exception("Parser msg extraction error: %s", ex.args[0]) else: logger.error("Parser msg extraction error: %s", ex.args[0]) del ims[:] # delete rest of stream to force cold restart except (ValidationError, Exception) as ex: # non Extraction Error # Non extraction errors happen after successfully extracted from stream # so we don't flush rest of stream just resume if logger.isEnabledFor(logging.TRACE): logger.exception("Parser msg non-extraction error: %s", ex.args[0]) if logger.isEnabledFor(logging.DEBUG): logger.error("Parser msg non-extraction error: %s", ex.args[0]) yield return True # should never return
[docs] def groupParsator(self, ims=None, framed=True, piped=False, kvy=None, tvy=None, exc=None, rvy=None, vry=None, local=None, version=None): """Returns generator to parse nested GenericGroups whose outermost nesting appears at the top-lever of an incoming message stream. If ims not provided then parse messages from .ims Parameters: ims (bytearray): of incoming message stream. May contain one or more sets each of a serialized message with attached cryptographic material such as signatures or receipts. framed (bool): True means ims contains only one frame of msg plus counted attachments instead of stream with multiple messages piped (bool): True means use pipeline processor to process ims msgs when stream includes pipelineable count codes. kvy (Kevery): route KERI KEL message types to this instance tvy (Tevery): route TEL message types to this instance exc (Exchanger) route EXN message types to this instance rvy (Revery): reply (RPY) message handler vry (Verfifier): credential verifier with wallet storage local (bool): True means event source is local (protected) for validation False means event source is remote (unprotected) for validation None means use default .local version (Versionage): default version of CESR to use None means do not change default """ if ims is None: ims = self.ims local = local if local is not None else self.local local = True if local else False self.version = version # when not None which sets .methods .codes .mucodes .sucodes stack = deque() # (svrsn, ims) stack of nested substreams framed by generic groegups svrsn = None eggs = None # used in preflused error done = False try: while True: # process stream until done while not ims and stack: # happens when ascending (un-nesting) svrsn, ims = stack.pop() # un-nest self.version = svrsn # only changes if svrsn is not None if not ims: # no stream and no stack break # check front of stream for GenericGroup to nest down cold = sniff(ims) # check front of stream if cold != Colds.msg: # peek for generic group at front of ims ctr = yield from self._extractor(ims=ims, klas=Counter, cold=cold, abort=framed, strip=False) if (ctr.code in (self.sucodes.GenericGroup, self.sucodes.BigGenericGroup)): del ims[:ctr.byteSize(cold=cold)] # consume ctr itself # compute enclosing generic group size based on txt or bny eggs = ctr.byteCount(cold=cold) while len(ims) < eggs and not framed: # framed already in ims yield eims = ims[:eggs] # copy out substream enclosed attachments del ims[:eggs] # strip off from ims # peek for version code at front of eims ctr = yield from self._extractor(ims=eims, klas=Counter, cold=cold, abort=framed, strip=False) if ctr.code == self.codes.KERIACDCGenusVersion: del eims[:ctr.byteSize(cold=cold)] # consume ctr itself # get new substream version svrsn = Counter.b64ToVer(ctr.countToB64(l=3)) else: svrsn == None # no change to substream version stack.append((self.version, ims)) # push current version ims self.version = svrsn # replace with new version if not None ims = eims # replace ims to nest down one level # top-level nested generics enclose messages plus attachments framed = True if piped: pass # pass extracted ims to pipeline processor return # pop stack here instead continue # captures immediate further nested groups # process substream at current nesting level try: done = yield from self.msgParsator(ims=ims, framed=framed, piped=piped, kvy=kvy, tvy=tvy, exc=exc, rvy=rvy, vry=vry, local=local, version=self.version) except TopLevelStreamError as ex: # encountered GenericGroup continue # so returns control here to parse that group except (ValidationError, Exception) as ex: # non Extraction Error # Non extraction errors happen after a message has been # successfully extracted from stream # so we don't flush rest of stream just resume continue except ExtractionError as ex: # maybe this needs to be more granular if eggs is not None: # extracted enclosed message group is preflushed raise SizedGroupError(f"Error processing generic group" f" of size={eggs}") raise # no enclosing message group so can't preflush, must flush stream finally: # restore version at top level while stack: # when tail end of group has validation error svrsn, _ = stack.pop() if svrsn: self.version = svrsn return done
[docs] def msgParsator(self, ims=None, framed=True, piped=False, kvy=None, tvy=None, exc=None, rvy=None, vry=None, local=None, version=None): """Returns generator that upon each iteration extracts and parses msg with attached crypto material (signature etc) from incoming message stream, ims, and dispatches processing of message with attachments. Uses .ims when ims is not provided. Iterator yields when not enough bytes in ims to finish one msg plus attachments. Returns (which raises StopIteration) when finished. Parameters: ims (bytearray): serialized incoming message stream. May contain one or more sets each of a serialized message with attached cryptographic material such as signatures or receipts. framed (bool): True means ims contains only one frame of msg plus counted attachments instead of stream with multiple messages piped (bool): True means use pipeline processor to process ims msgs when stream includes pipelineable count codes. kvy (Kevery): route KERI KEL message types to this instance tvy (Tevery): route TEL message types to this instance exc (Exchanger): route EXN message types to this instance rvy (Revery): reply (RPY) message handler vry (Verifier): ACDC credential processor local (bool): True means event source is local (protected) for validation False means event source is remote (unprotected) for validation None means use default .local version (``Versionage``): default version of CESR to use. None means do not change default Logic:: Currently only support couters on attachments not on combined or on message Attachments must all have counters so know if txt or bny format for attachments. So even when framed==True must still have counter. Do While loop sniff to set up first extraction raise exception and flush full tream if stream start is counter must be message extract message sniff for counter if group counter extract and discard but keep track of count so if error while processing attachments then only need to flush attachment count not full stream. """ if ims is None: ims = self.ims local = local if local is not None else self.local local = True if local else False self.version = version # when not None which sets .codes .mucodes. .sucodes verstack = deque() # version stack append and pop # create exts (extracts) keyword args dict with fields: # serder (Serder): message instance # sigers (list[Siger]): attached indexed controller signatures # wigers (list[Siger]): attached indexed witness signatures # cigars (list[Cigar]): attached non-transferable from couple (verfer, sig) # trqs (list[tuple]): (prefixer, number, diger, siger) # tsgs (list[tuple]): (prefixer, number, diger, [Sigers]) triple plus list of sigs # ssgs (list[tuple]): (prefixer,[Sigers]) single plus list of sigs # frcs (list[tuple]): (number, dater) # sscs (list[tuple]): (number, diger) issuing or delegating # ssts (list[tuple]): (prefixer, number, diger) issued or delegated # tdcs (list[tuple]): (verser, diger) SealKind TypedDigestSealCouples # ptds (list[bytes]): pathed streams # essrs (list[Texter]): essr encapsulations as Texters # bsqs (list[tuple]): (diger, noncer, noncer, labeler) BlindState # bsss (list[tuple]): (diger, noncer, noncer, labeler, number, noncer) BoundState # tmqs (list[tuple]): (diger, noncer, labeler, texter) TypeMedia # local (bool): True if local source controller context for processing exts = dict(serder=None, sigers=[], wigers=[], cigars=[], trqs=[], tsgs=[], ssgs=[], frcs=[], sscs=[], ssts=[], tdcs=[], ptds=[], essrs=[], bsqs=[], bsss=[], tmqs=[], local=local) serdery = Serdery(version=Version) try: while not ims and not framed: yield emgs = None # size of enclosing message group if any when is not None ctr = None # no counter to process when not None then extracted need to process # Check for genus-version change # Note: Genus-Version Counter count code and format is universal # accross all genera and all versions of all genera. Therefore any # version counter should work for parsing stream genus version cold = sniff(ims) # front of top level of this substream if cold != Colds.msg: # counter found so peek at it ctr = yield from self._extractor(ims=ims, klas=Counter, cold=cold, abort=framed, strip=False) if ctr.code == self.codes.KERIACDCGenusVersion: del ims[:ctr.byteSize(cold=cold)] # consume ctr itself # change version at top level this persists is not stacked self.version = Counter.b64ToVer(ctr.countToB64(l=3)) # check for BodyWithAttachmentGroup or non-native message or native message groups cold = sniff(ims) # front of top level of this substream if cold != Colds.msg: # counter found so peek at it ctr = yield from self._extractor(ims=ims, klas=Counter, cold=cold, abort=framed, strip=False) if ctr and ctr.code in (self.sucodes.BodyWithAttachmentGroup, self.sucodes.BigBodyWithAttachmentGroup): del ims[:ctr.byteSize(cold=cold)] # consume ctr itself # compute enclosing group size based on txt or bny emgs = ctr.byteCount(cold=cold) while len(ims) < emgs and not framed: # framed already in ims yield eims = ims[:emgs] # copy out substream enclosed attachments del ims[:emgs] # strip off from ims ims = eims # replace since message group includes attachments framed = True # since includes attachments so pre-extracted if piped: pass # pass extracted ims to pipeline processor return # peek for version ctr = yield from self._extractor(ims=ims, klas=Counter, cold=cold, abort=framed, strip=False) if ctr.code == self.codes.KERIACDCGenusVersion: del ims[:ctr.byteSize(cold=cold)] # consume ctr itself # change version verstack.append(self.version) # push current onto stack self.version = Counter.b64ToVer(ctr.countToB64(l=3)) # peek at next counter either native or non-native msg group ctr = yield from self._extractor(ims=ims, klas=Counter, cold=cold, abort=framed, strip=False) # Check for message groups if (ctr.code in (self.mucodes.NonNativeBodyGroup, self.mucodes.BigNonNativeBodyGroup)): del ims[:ctr.byteSize(cold=cold)] # consume ctr itself # process non-native message group with texter texter = yield from self._extractor(ims=ims, klas=Texter, cold=cold, abort=framed) serder = serdery.reap(ims=texter.raw, genus=self.genus, svrsn=self.version) exts['serder'] = serder elif (ctr.code in (self.mucodes.FixBodyGroup, self.mucodes.BigFixBodyGroup)): # native fixed field cbs = ctr.byteSize(cold=cold) # counter size of counter itself fmgs = ctr.byteCount(cold=cold) # fixed body content size size = cbs + fmgs # size of ctr and its content while len(ims) < size and not framed: # framed already in ims yield # until full ctr and its content in ims fims = ims[:size] # copy out ctr and its content del ims[:size] # strip off from ims if cold == Colds.bny: # tranform to text domain fims = encodeB64(fims) # always process event in qb64 text domain size = (size * 4) // 3 # fims includes full body with counter but no attachments serder = serdery.reap(ims=fims, genus=self.genus, svrsn=self.version, ctr=ctr, size=size, fixed=True) exts['serder'] = serder elif (ctr.code in (self.mucodes.MapBodyGroup, self.mucodes.BigMapBodyGroup)): # native field map cbs = ctr.byteSize(cold=cold) # counter size of counter itself mmgs = ctr.byteCount(cold=cold) # fixed body group size size = cbs + mmgs while len(ims) < size and not framed: # framed already in ims yield # until full ctr and its content in ims mims = ims[:size] # copy out ctr and its content del ims[:size] # strip off from ims if cold == Colds.bny: # tranform to text domain mims = encodeB64(mims) # always process event in qb64 text domain size = (size * 4) // 3 # mims includes ctr and its content but no attachments serder = serdery.reap(ims=mims, genus=self.genus, svrsn=self.version, ctr=ctr, size=size, fixed=False) exts['serder'] = serder elif (ctr.code in (self.sucodes.GenericGroup, self.sucodes.BigGenericGroup)): # return control to groupParsator raise TopLevelStreamError(f"Got GenericGroup so revisit.") else: # shouldn't be a counter of any other type here raise ColdStartError(f"Expected message counter code," f" got code={ctr.code}") else: # Otherwise its JSON, CBOR, or MGPK message at top level while True: # extract, deserialize, and strip message from ims try: serder = serdery.reap(ims=ims, genus=self.genus, svrsn=self.version) except ShortageError as ex: # need more bytes if framed: # pre-extracted raise # incomplete frame or group so abort by raising error yield else: # extracted and stripped successfully exts['serder'] = serder break # break out of while loop except ExtractionError as ex: if emgs is not None: # extracted enclosed message group is preflushed raise SizedGroupError(f"Error processing enclosing " f"message group of size={emgs}") raise # no enclosing group so can't preflush, must flush stream # Extract and deserialize attachments enclosed = False # True means all attachments enclosed in AttachmentGroup try: # catch errors here to flush only counted part of stream # attachments must start with counter so know if txt or bny. # if no attachments MUST have at least empty AttachmentGroup while not ims and not framed: # framed has everything already yield # when not framed at least empty AttachmentGroup follows cold = sniff(ims) if cold != Colds.msg: # counter so peek at what it is ctr = yield from self._extractor(ims=ims, klas=Counter, cold=cold, abort=framed, strip=False) if ctr.code in (self.codes.AttachmentGroup, self.codes.BigAttachmentGroup): del ims[:ctr.byteSize(cold=cold)] # consume ctr itself # compute enclosing attachment group size based on txt or bny eags = ctr.byteCount(cold=cold) while len(ims) < eags and not framed: yield eims = ims[:eags] # copy out substream enclosed attachments del ims[:eags] # strip off from ims consume contents from ims ims = eims # now just process substream as one counted frame enclosed = True if piped: pass # pass extracted ims to pipeline processor return # peek for version change ctr = yield from self._extractor(ims=ims, klas=Counter, cold=cold, abort=enclosed, strip=False) if ctr.code == self.codes.KERIACDCGenusVersion: del ims[:ctr.byteSize(cold=cold)] # consume ctr itself # change version verstack.append(self.version) # push current onto stack self.version = Counter.b64ToVer(ctr.countToB64(l=3)) while True: # iteratively process attachment counters in stride ctr = yield from self._extractor(ims=ims, klas=Counter, cold=cold, abort=framed or enclosed, strip=False) # peek at ctr # check if group belongs to top level group message in stream if (ctr.code in self.mucodes or ctr.code in self.sucodes or ctr.code == self.codes.KERIACDCGenusVersion): # do not consume leave belongs with new msg break # not a valid attachment so done with attachments to this msg del ims[:ctr.byteSize(cold=cold)] # consume ctr itself try: yield from getattr(self, self.methods[ctr.name])(exts=exts, ims=ims, ctr=ctr, cold=cold, abort=(framed or enclosed)) except AttributeError as ex: raise UnexpectedCountCodeError(f"Unsupported count" f" code={ctr.code}") from ex except Exception as ex: raise # easier debug with breakpoint here if enclosed: # attachments framed by enclosing AttachmentGroup # inside of group all contents must be same cold .txt # or .bny so no need to sniff for new cold here. if not ims: # end of attachment group break else: # assumes that if attachments are not enclosed that # framed must be true, which means ims, message plus # attachments all provided at once # ims framed in some way, but not by enclosing AttachmentGroup # not all attachments in one enclosing group, each individual # attachment group may switch stream state txt or bny if not ims: # end of frame break cold = sniff(ims) if cold == Colds.msg: # new non-group message so attachments done break # finished attachments since new message #else: # see next msg but with no attachments on current messge # so just proceed to process current message except ExtractionError as ex: if enclosed: # extracted enclosed attachment group is preflushed raise SizedGroupError(f"Error processing attachment group" " of size={eags}") raise # no enclosing attachment group so can't preflush, must flush stream finally: while verstack: # restore version to what it was self.version = verstack.pop() if isinstance(serder, SerderKERI): ilk = serder.ilk # dispatch abased on ilk if ilk in [Ilks.icp, Ilks.rot, Ilks.ixn, Ilks.dip, Ilks.drt]: # event msg firner, dater = exts['frcs'][-1] if exts['frcs'] else (None, None) # use last one if more than one # when present assumes this is source seal of delegating event in delegator's KEL delsner, delsger = exts['sscs'][-1] if exts['sscs'] else (None, None) # use last one if more than one if not exts['sigers']: # sigers: msg = f"Missing attached signature(s) for evt = {serder.ked['d']}" logger.info(msg) logger.debug("Event Body = \n%s\n", serder.pretty()) raise ValidationError(msg) try: exts['firner'] = firner # first seen number exts['dater'] = dater exts['delsner'] = Number(num=delsner.sn) if delsner is not None else None exts['delsger'] = delsger kvy.processEvent(**exts) if exts['cigars']: # cigars kvy.processAttachedReceiptCouples(**exts) if exts['trqs']: # trqs kvy.processAttachedReceiptQuadruples(**exts) except AttributeError as ex: msg = f"No kevery to process so dropped msg={serder.said}" logger.info(msg) logger.debug("Event Body = \n%s\n", serder.pretty()) raise ValidationError(msg) from ex elif ilk in [Ilks.rct]: # event receipt msg (nontransferable) if not (exts['cigars'] or exts['wigers'] or exts['tsgs']): # (cigars or wigers or tsgs) msg = f"Missing attached signatures on receipt msg sn={serder.sn} SAID={serder.said}" logger.info(msg) logger.debug("Receipt body=\n%s\n", serder.pretty()) raise ValidationError(msg) try: kvy.processReceipt(**exts) except AttributeError as ex: raise ValidationError(f"No kevery to process so dropped msg" f"= {serder.pretty()}.") from ex elif ilk in (Ilks.rpy,): # reply message if not (exts['cigars'] or exts['tsgs']): # (cigars or tsgs) raise ValidationError(f"Missing attached endorser signature(s) " f"to reply msg = {serder.pretty()}.") try: rvy.processReply(**exts) except AttributeError as ex: raise ValidationError(f"No revery to process so dropped msg" f"= {serder.pretty()}.") from ex elif ilk in (Ilks.qry,): # query message # ToDo neigher kvy.processQuery nor tvy.processQuery actually verify if exts['ssgs']: # use last one if more than one pre, sigers = exts['ssgs'][-1] if exts['ssgs'] else (None, None) exts["source"] = pre exts["sigers"] = sigers else: exts['sigers'] = [] # just in case sigers provided not by ssgs if not (exts['source'] or exts['cigars']): # need one or the other raise ValidationError(f"Missing attached requester " f"source for query" f" msg = {serder.pretty()}.") route = serder.ked["r"] if route in ["logs", "ksn", "mbx"]: try: kvy.processQuery(**exts) except AttributeError as ex: raise ValidationError(f"No kevery to process so " f" dropped msg={serder.pretty()}") from ex except QueryNotFoundError as ex: # catch escrow error and log it if logger.isEnabledFor(logging.TRACE): logger.exception("Error processing query = %s", ex) logger.trace("Query Body=\n%s\n", serder.pretty()) else: logger.error("Error processing query = %s", ex) elif route in ["tels", "tsn"]: try: tvy.processQuery(**exts) except AttributeError as ex: raise ValidationError(f"No tevery to process so dropped msg" f"={serder.pretty()}") from ex else: raise ValidationError(f"Invalid resource type {route}" f"so dropped msg={serder.pretty()}.") elif ilk in (Ilks.exn,): if not (exts['cigars'] or exts['tsgs']): raise ValidationError(f"Missing attached exchanger " f"signatures for msg={serder.pretty()}") try: exc.processEvent(**exts) except AttributeError as ex: raise ValidationError(f"No Exchange to process so " f"dropped msg={serder.pretty()}.") from ex elif ilk in (Ilks.vcp, Ilks.vrt, Ilks.iss, Ilks.rev, Ilks.bis, Ilks.brv): # TEL msg # get transaction event seal ref to Issuer's KEL # use last one if more than one number, diger = exts['sscs'][-1] if exts['sscs'] else (None, None) exts['seqner'] = number exts['saider'] = diger try: tvy.processEvent(**exts) except AttributeError as ex: raise ValidationError(f"No tevery to process so dropped msg" f"={serder.pretty()}.") from ex else: raise ValidationError(f"Unexpected message {ilk=} for evt=" f"{serder.pretty()}") elif isinstance(serder, SerderACDC): ilk = serder.ilk # dispatch based on ilk if ilk is None: # default for ACDC try: # use last one if more than one prefixer, number, diger = exts['ssts'][-1] if exts['ssts'] else (None, None, None) exts['prefixer'] = prefixer exts['seqner'] = number exts['saider'] = diger vry.processACDC(**exts) except AttributeError as ex: raise ValidationError(f"No verifier to process so " f"dropped ACDC={serder.pretty()}") from ex else: raise ValidationError(f"Unexpected message ilk = {ilk} " f"for evt={serder.pretty()}") else: raise ValidationError(f"Unexpected protocol type={serder.proto}" f" for event message={serder.pretty()}.") return True # done state
# Group parse/extract methods for dispatch based on CESR version def _ControllerIdxSigs1(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv1 ControllerIdxSigs group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: sigers (list[Siger]): of indexed signature instances """ sigers = [] for i in range(ctr.count): # extract each attached signature siger = yield from self._extractor(ims=ims, klas=Siger, cold=cold, abort=abort) sigers.append(siger) try: exts['sigers'].extend(sigers) except KeyError: exts['sigers'] = sigers def _ControllerIdxSigs2(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv2 ControllerIdxSigs group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: sigers (list[Siger]): of indexed signature instances """ gs = ctr.byteCount(cold=cold) # ctr.count * 4 if cold == Colds.txt else ctr.count * 3 while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims sigers = [] while gims: # extract each attached signature and strip from gims sigers.append(self.extract(ims=gims, klas=Siger, cold=cold)) try: exts['sigers'].extend(sigers) except KeyError: exts['sigers'] = sigers def _WitnessIdxSigs1(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv1 WitnessIdxSigs group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: wigers (list[Siger]): of indexed signature instances """ wigers = [] for i in range(ctr.count): # extract each attached signature wiger = yield from self._extractor(ims=ims, klas=Siger, cold=cold, abort=abort) wigers.append(wiger) try: exts['wigers'].extend(wigers) except KeyError: exts['wigers'] = wigers def _WitnessIdxSigs2(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv2 WitnessIdxSigs group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: wigers (list[Siger]): of indexed signature instances """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims wigers = [] while gims: # extract each attached signature and strip from gims wigers.append(self.extract(ims=gims, klas=Siger, cold=cold)) try: exts['wigers'].extend(wigers) except KeyError: exts['wigers'] = wigers def _NonTransReceiptCouples1(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv1 NonTransReceiptCouples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: cigars (list[Cigar]): of signature instances with assigned verfer """ cigars = [] for i in range(ctr.count): # extract each attached couple verfer = yield from self._extractor(ims=ims, klas=Verfer, cold=cold, abort=abort) cigar = yield from self._extractor(ims=ims, klas=Cigar, cold=cold, abort=abort) cigar.verfer = verfer cigars.append(cigar) try: exts['cigars'].extend(cigars) except KeyError: exts['cigars'] = cigars def _NonTransReceiptCouples2(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv2 NonTransReceiptCouples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: cigars (list[Cigar]): of signature instances with assigned verfer """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims cigars = [] while gims: # extract each attached couple and strip from gims verfer = self.extract(ims=gims, klas=Verfer, cold=cold) cigar = self.extract(ims=gims, klas=Cigar, cold=cold) cigar.verfer = verfer cigars.append(cigar) try: exts['cigars'].extend(cigars) except KeyError: exts['cigars'] = cigars def _TransReceiptQuadruples1(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv1 TransReceiptQuadruples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: trqs (list[tuple]): [(prefixer,seqner,saider,siger)] extract attaced trans receipt vrc quadruple spre+ssnu+sdig+sig spre is pre of signer of vrc ssnu is sn of signer's est evt when signed sdig is dig of signer's est event when signed sig is indexed signature of signer on this event msg """ trqs = [] for i in range(ctr.count): # extract each attached quadruple prefixer = yield from self._extractor(ims=ims, klas=Prefixer, cold=cold, abort=abort) number = yield from self._extractor(ims=ims, klas=Number, cold=cold, abort=abort) diger = yield from self._extractor(ims=ims, klas=Diger, cold=cold, abort=abort) siger = yield from self._extractor(ims=ims, klas=Siger, cold=cold, abort=abort) trqs.append((prefixer, number, diger, siger)) try: exts['trqs'].extend(trqs) except KeyError: exts['trqs'] = trqs def _TransReceiptQuadruples2(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv2 TransReceiptQuadruples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: trqs (list[tuple]): [(prefixer,number,diger,siger)] extract attaced trans receipt vrc quadruple spre+ssnu+sdig+sig spre is pre of signer of vrc ssnu is sn of signer's est evt when signed sdig is dig of signer's est event when signed sig is indexed signature of signer on this event msg """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims trqs = [] while gims: # extract each attached quadruple and strip from gims prefixer = self.extract(ims=gims, klas=Prefixer, cold=cold) number = self.extract(ims=gims, klas=Number, cold=cold) diger = self.extract(ims=gims, klas=Diger, cold=cold) siger = self.extract(ims=gims, klas=Siger, cold=cold) trqs.append((prefixer, number, diger, siger)) try: exts['trqs'].extend(trqs) except KeyError: exts['trqs'] = trqs def _TransIdxSigGroups1(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv1 TransIdxSigGroups group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: tsgs (list[tuple]): [(prefixer,number,diger,[isigers])] """ tsgs = [] for i in range(ctr.count): # extract each attached group prefixer = yield from self._extractor(ims=ims, klas=Prefixer, cold=cold, abort=abort) number = yield from self._extractor(ims=ims, klas=Number, cold=cold, abort=abort) diger = yield from self._extractor(ims=ims, klas=Diger, cold=cold, abort=abort) ictr = yield from self._extractor(ims=ims, klas=Counter, cold=cold, abort=abort) if ictr.code != CtrDex_1_0.ControllerIdxSigs: raise UnexpectedCountCodeError(f"Expected count code=" f"{CtrDex_1_0.ControllerIdxSigs}, got code={ictr.code}") isigers = [] for i in range(ictr.count): # extract each signature in idx cnt isiger = yield from self._extractor(ims=ims, klas=Siger, cold=cold, abort=abort) isigers.append(isiger) tsgs.append((prefixer, number, diger, isigers)) try: exts['tsgs'].extend(tsgs) except KeyError: exts['tsgs'] = tsgs def _TransIdxSigGroups2(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv2 TransIdxSigGroups group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: tsgs (list[tuple]): [(prefixer,number,diger,[isigers])] """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims tsgs = [] isigers = [] while gims: # extract each attached group and strip from gims prefixer = self.extract(ims=gims, klas=Prefixer, cold=cold) number = self.extract(ims=gims, klas=Number, cold=cold) diger = self.extract(ims=gims, klas=Diger, cold=cold) ictr = self.extract(ims=gims, klas=Counter, cold=cold) if ictr.code != CtrDex_2_0.ControllerIdxSigs: raise UnexpectedCountCodeError(f"Expected count code=" f"{CtrDex_2_0.ControllerIdxSigs}, got code={ictr.code}") igs = ictr.byteCount(cold=cold) # already extracted enclosing group bytes so igs must be < len(gims) if len(gims) < igs: # should not happen unless malformed counter raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") igims = gims[:igs] del gims[:igs] # strip igims from gims isigers = [] while igims: isiger = self.extract(ims=igims, klas=Siger, cold=cold) isigers.append(isiger) tsgs.append((prefixer, number, diger, isigers)) # tuple try: exts['tsgs'].extend(tsgs) except KeyError: exts['tsgs'] = tsgs def _TransLastIdxSigGroups1(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv1 TransLastIdxSigGroups group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: ssgs (list[tuple]): [(prefixer, [isigers])] """ ssgs = [] for i in range(ctr.count): # extract each attached group prefixer = yield from self._extractor(ims=ims, klas=Prefixer, cold=cold, abort=abort) ictr = yield from self._extractor(ims=ims, klas=Counter, cold=cold, abort=abort) if ictr.code != CtrDex_1_0.ControllerIdxSigs: raise UnexpectedCountCodeError(f"Expected count code=" f"{CtrDex_1_0.ControllerIdxSigs}, got code={ictr.code}") isigers = [] for i in range(ictr.count): # extract each signature in idx cnt isiger = yield from self._extractor(ims=ims, klas=Siger, cold=cold, abort=abort) isigers.append(isiger) ssgs.append((prefixer, isigers)) try: exts['ssgs'].extend(ssgs) except KeyError: exts['ssgs'] = ssgs def _TransLastIdxSigGroups2(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv2 TransLastIdxSigGroups group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: ssgs (list[tuple]): [(prefixer, [isigers])] """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims ssgs = [] isigers = [] while gims: # extract each attached group strip from gims prefixer = self.extract(ims=gims, klas=Prefixer, cold=cold) ictr = self.extract(ims=gims, klas=Counter, cold=cold) if ictr.code != CtrDex_2_0.ControllerIdxSigs: raise UnexpectedCountCodeError(f"Expected count code=" f"{CtrDex_2_0.ControllerIdxSigs}, got code={ictr.code}") igs = ictr.byteCount(cold=cold) # already extracted enclosing group bytes so igs must be < len(gims) if len(gims) < igs: # should not happen unless malformed counter raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") igims = gims[:igs] del gims[:igs] # strip igims from gims isigers = [] while igims: isiger = self.extract(ims=igims, klas=Siger, cold=cold) isigers.append(isiger) ssgs.append((prefixer, isigers)) # tuple try: exts['ssgs'].extend(ssgs) except KeyError: exts['ssgs'] = ssgs def _FirstSeenReplayCouples1(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv1 FirstSeenReplayCouples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: frcs (list[tuple]): [(number, dater)] first seen sn """ frcs = [] for i in range(ctr.count): # extract each attached group firner = yield from self._extractor(ims=ims, klas=Number, cold=cold, abort=abort) dater = yield from self._extractor(ims=ims, klas=Dater, cold=cold, abort=abort) frcs.append((firner, dater)) try: exts['frcs'].extend(frcs) except KeyError: exts['frcs'] = frcs def _FirstSeenReplayCouples2(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv2 FirstSeenReplayCouples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: frcs (list[tuple]): [(number, dater)] first seen sn """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims frcs = [] while gims: # extract each attached group and strip from gims firner = self.extract(ims=gims, klas=Number, cold=cold) dater = self.extract(ims=gims, klas=Dater, cold=cold) frcs.append((firner, dater)) try: exts['frcs'].extend(frcs) except KeyError: exts['frcs'] = frcs def _SealSourceCouples1(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv1 SealSourceCouples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: sscs (list[tuple]): [(seqner, number)] """ sscs = [] for i in range(ctr.count): # extract each attached group number = yield from self._extractor(ims=ims, klas=Number, cold=cold, abort=abort) diger = yield from self._extractor(ims=ims, klas=Diger, cold=cold, abort=abort) sscs.append((number, diger)) try: exts['sscs'].extend(sscs) except KeyError: exts['sscs'] = sscs def _SealSourceCouples2(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv2 SealSourceCouples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: sscs (list[tuple]): [(seqner, number)] """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims sscs = [] while gims: # extract each attached group and strip from gims number = self.extract(ims=gims, klas=Number, cold=cold) diger = self.extract(ims=gims, klas=Diger, cold=cold) sscs.append((number, diger)) try: exts['sscs'].extend(sscs) except KeyError: exts['sscs'] = sscs def _SealSourceTriples1(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv1 SealSourceTriples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: ssts (list[tuple]): [(prefixer, seqner, saider)] """ ssts = [] for i in range(ctr.count): # extract each attached group prefixer = yield from self._extractor(ims=ims, klas=Prefixer, cold=cold, abort=abort) number = yield from self._extractor(ims=ims, klas=Number, cold=cold, abort=abort) diger = yield from self._extractor(ims=ims, klas=Diger, cold=cold, abort=abort) ssts.append((prefixer, number, diger)) try: exts['ssts'].extend(ssts) except KeyError: exts['ssts'] = ssts def _SealSourceTriples2(self, exts, ims, ctr, cold, abort): """Generator to extract and strip CESRv2 SealSourceTriples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: ssts (list[tuple]): [(prefixer, seqner, saider)] """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims ssts = [] while gims: # extract each attached group and strip from gims prefixer = self.extract(ims=gims, klas=Prefixer, cold=cold) number = self.extract(ims=gims, klas=Number, cold=cold) diger = self.extract(ims=gims, klas=Diger, cold=cold) ssts.append((prefixer, number, diger)) try: exts['ssts'].extend(ssts) except KeyError: exts['ssts'] = ssts def _TypedDigestSealCouples(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv2 TypedDigestSealCouples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: tdcs (list[tuple]): [(verser, diger)] """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims tdcs = [] while gims: # extract each attached group and strip from gims verser = self.extract(ims=gims, klas=Verser, cold=cold) diger = self.extract(ims=gims, klas=Diger, cold=cold) tdcs.append((verser, diger)) try: exts['tdcs'].extend(tdcs) except KeyError: exts['tdcs'] = tdcs def _PathedMaterialCouples(self, exts, ims, ctr, cold, abort): """Generator to extract and strip CESR v1 and v2 PathedMaterialCouples Includes both big and small sized groups. Since v1 counts quadlets/triples the logic is the same for both v1 and v2. The contexts of a pathed material group MUST be a CESR attachment sub-stream i.e. primitives or groups of primitives. It may not include any top-level messages expecially not any messages as JSON, CBOR, MGPK Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: pims (list[bytes]): [gims] """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims try: exts['ptds'].extend([gims]) except KeyError: exts['ptds'] = [gims] def _ESSRPayloadGroup1(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv1 ESSRPayloadGroup group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: essrs (list[Texter]): [texter] """ essrs = [] for i in range(ctr.count): # extract each attached group texter = yield from self._extractor(ims=ims, klas=Texter, cold=cold, abort=abort) essrs.append(texter) try: exts['essrs'].extend(essrs) except KeyError: exts['essrs'] = essrs def _ESSRPayloadGroup2(self, exts, ims, ctr, cold, abort): """Generator to extract CESRv2 ESSRPayloadGroup group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: essrs (list[Texter]): [texter] """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims essrs = [] while gims: # extract each attached group and strip from gims texter = self.extract(ims=gims, klas=Texter, cold=cold) essrs.append(texter) try: exts['essrs'].extend(essrs) except KeyError: exts['essrs'] = essrs def _BlindedStateQuadruples(self, exts, ims, ctr, cold, abort): """Generator to extract and strip CESRv2 BlindedStateQuadruples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: bsqs (list[tuple]): [(diger, noncer, acdcer, stater)] """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims bsqs = [] while gims: # extract each attached group and strip from gims diger = self.extract(ims=gims, klas=Diger, cold=cold) noncer = self.extract(ims=gims, klas=Noncer, cold=cold) # Noncer may be empty code acdcer = self.extract(ims=gims, klas=Noncer, cold=cold) # Noncer may be empty code stater = self.extract(ims=gims, klas=Labeler, cold=cold) # Labeler may be empty code bsqs.append((diger, noncer, acdcer, stater)) try: exts['bsqs'].extend(bsqs) except KeyError: exts['bsqs'] = bsqs def _BoundStateSextuples(self, exts, ims, ctr, cold, abort): """Generator to extract and strip CESRv2 BoundStateSextuples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: bsss (list[tuple]): [(diger, noncer, acdcer, stater, number, eventer)] """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims bsss = [] while gims: # extract each attached group and strip from gims diger = self.extract(ims=gims, klas=Diger, cold=cold) noncer = self.extract(ims=gims, klas=Noncer, cold=cold) # Noncer may be empty code acdcer = self.extract(ims=gims, klas=Noncer, cold=cold) # Noncer may be empty code stater = self.extract(ims=gims, klas=Labeler, cold=cold) # Labeler may be empty code number = self.extract(ims=gims, klas=Number, cold=cold) # Labeler may be empty code eventer = self.extract(ims=gims, klas=Noncer, cold=cold) # Noncer may be empty code bsss.append((diger, noncer, acdcer, stater, number, eventer)) try: exts['bsss'].extend(bsss) except KeyError: exts['bsss'] = bsss def _TypedMediaQuadruples(self, exts, ims, ctr, cold, abort): """Generator to extract and strip CESRv2 TypedMediaQuadruples group Parameters: exts (dict): of extracted group elements for keyword args. ims (bytearray): of serialized incoming message stream. ctr (Counter): instance of CESR v1 Counter of code .ControllerIdxSigs cold (Coldage): assumes str value is either Colds.txt or Colds.bny abort (bool): True means abort if not enough bytes in ims. Use when this group is enclosed in another group that has already been extracted from stream False yield if not enough bytes in ims. Use when this group is at top level of stream not enclosed in another already extracted group. Returns: tmqs (list[tuple]): [(diger, noncer, labeler, texter)] """ gs = ctr.byteCount(cold=cold) while len(ims) < gs: if abort: # assumes already full frame extracted unexpected problem raise ShortageError(f"Unexpected stream shortage on enclosed " f"group code={ctr.qb64}") yield # wait until have full group size gims = ims[:gs] # copy out group sized substream del ims[:gs] # strip off from ims tmqs = [] while gims: # extract each attached group and strip from gims diger = self.extract(ims=gims, klas=Diger, cold=cold) noncer = self.extract(ims=gims, klas=Noncer, cold=cold) # Noncer may be empty code labeler = self.extract(ims=gims, klas=Labeler, cold=cold) texter = self.extract(ims=gims, klas=Texter, cold=cold) tmqs.append((diger, noncer, labeler, texter)) try: exts['tmqs'].extend(tmqs) except KeyError: exts['tmqs'] = tmqs