# -*- encoding: utf-8 -*-
"""
keri.core.parsing module
message stream parsing support
"""
import logging
import traceback
from collections import namedtuple
from dataclasses import dataclass, astuple
from .coring import (Ilks, CtrDex, Counter, Seqner, Siger, Cigar,
Dater, Verfer, Prefixer, Saider, Pather, Protos )
from . import serdering
from .. import help
from .. import kering
logger = help.ogler.getLogger()
[docs]
@dataclass(frozen=True)
class ColdCodex:
"""
ColdCodex is codex of cold stream start tritets of first byte
Only provide defined codes.
Undefined are left out so that inclusion(exclusion) via 'in' operator works.
First three bits:
0o0 = 000 free
0o1 = 001 cntcode B64
0o2 = 010 opcode B64
0o3 = 011 json
0o4 = 100 mgpk
0o5 = 101 cbor
0o6 = 110 mgpk
007 = 111 cntcode or opcode B2
status is one of ('evt', 'txt', 'bny' )
'evt' if tritet in (ColdDex.JSON, ColdDex.MGPK1, ColdDex.CBOR, ColdDex.MGPK2)
'txt' if tritet in (ColdDex.CtB64, ColdDex.OpB64)
'bny' if tritet in (ColdDex.CtOpB2,)
otherwise raise ColdStartError
x = bytearray([0x2d, 0x5f])
x == bytearray(b'-_')
x[0] >> 5 == 0o1
True
"""
Free: int = 0o0 # not taken
CtB64: int = 0o1 # CountCode Base64
OpB64: int = 0o2 # OpCode Base64
JSON: int = 0o3 # JSON Map Event Start
MGPK1: int = 0o4 # MGPK Fixed Map Event Start
CBOR: int = 0o5 # CBOR Map Event Start
MGPK2: int = 0o6 # MGPK Big 16 or 32 Map Event Start
CtOpB2: int = 0o7 # CountCode or OpCode Base2
def __iter__(self):
return iter(astuple(self))
ColdDex = ColdCodex() # Make instance
Coldage = namedtuple("Coldage", 'msg txt bny') # stream cold start status
Colds = Coldage(msg='msg', txt='txt', bny='bny')
[docs]
class Parser:
"""
Parser is stream parser that processes an incoming message stream.
Each message in the stream is composed of a message body with a message foot
The message body includes a version string. The message foot is composed of
composable concatenated attachments encoded in CESR (Composable Event
Streaming Representation) CESR supports both binary and text formats where
text is Base64 URL/Filesafe. The attachements in a CESR foot may be converted
and round tripped en-masse between binary and text (Base64 URL/File).
CESR encoding ensures alignment on 24 bit boundaries.
Only supports current version VERSION
Has the following public attributes and properties:
Attributes:
ims (bytearray): incoming message stream
framed (bool): True means stream is packet framed
pipeline (bool): True means use pipeline processor to process
whenever stream includes pipelined count codes.
kvy (Kevery): route KEL message types to this instance
tvy (Tevery): route TEL message types to this instance
"""
[docs]
def __init__(self, ims=None, framed=True, pipeline=False, kvy=None, tvy=None, exc=None, rvy=None, vry=None):
"""
Initialize instance:
Parameters:
ims (bytearray): incoming message stream
framed (bool): True means ims contains only one msg body plus
its foot of attachments, not multiple sets of msg body plus foot
pipeline (bool): True means use pipeline processor to process
ims msgs when stream includes pipelined count codes.
kvy (Kevery): route KEL message types to this instance
tvy (Tevery): route TEL message types to this instance
exc (Exchanger): route EXN message types to this instance
rvy (Revery): reply (RPY) message handler
vry (Verfifier): credential verifier with wallet storage
"""
self.ims = ims if ims is not None else bytearray()
self.framed = True if framed else False # extract until end-of-stream
self.pipeline = True if pipeline else False # process as pipelined
self.kvy = kvy
self.tvy = tvy
self.exc = exc
self.rvy = rvy
self.vry = vry
[docs]
@staticmethod
def sniff(ims):
"""
Returns status string of cold start of stream ims bytearray by looking
at first triplet of first byte to determin if message or counter code
and if counter code whether Base64 or Base2 representation
First three bits:
0o0 = 000 free
0o1 = 001 cntcode B64
0o2 = 010 opcode B64
0o3 = 011 json
0o4 = 100 mgpk
0o5 = 101 cbor
0o6 = 110 mgpk
007 = 111 cntcode or opcode B2
counter B64 in (0o1, 0o2) return 'txt'
counter B2 in (0o7) return 'bny'
event in (0o3, 0o4, 0o5, 0o6) return 'evt'
unexpected in (0o0) raise ColdStartError
Colds = Coldage(msg='msg', txt='txt', bny='bny')
'msg' if tritet in (ColdDex.JSON, ColdDex.MGPK1, ColdDex.CBOR, ColdDex.MGPK2)
'txt' if tritet in (ColdDex.CtB64, ColdDex.OpB64)
'bny' if tritet in (ColdDex.CtOpB2,)
"""
if not ims:
raise kering.ShortageError("Need more bytes.")
tritet = ims[0] >> 5
if tritet in (ColdDex.JSON, ColdDex.MGPK1, ColdDex.CBOR, ColdDex.MGPK2):
return Colds.msg
if tritet in (ColdDex.CtB64, ColdDex.OpB64):
return Colds.txt
if tritet in (ColdDex.CtOpB2,):
return Colds.bny
raise kering.ColdStartError("Unexpected tritet={} at stream start.".format(tritet))
@staticmethod
def _extractor(ims, klas, cold=Colds.txt, abort=False):
"""
Returns generator to extract and return instance of klas from input
message stream, ims, given stream state, cold, is txt or bny.
If wait is True then yield when not enough bytes in stream otherwise
raise ShortageError
Inits klas from ims using qb64b or qb2 parameter based on cold.
Yields if not enough bytes in ims to fill out klas instance.
Usage:
instance = self._extractor
"""
while True:
try:
if cold == Colds.txt:
return klas(qb64b=ims, strip=True)
elif cold == Colds.bny:
return klas(qb2=ims, strip=True)
else:
raise kering.ColdStartError("Invalid stream state cold={}.".format(cold))
except kering.ShortageError as ex:
if abort: # pipelined pre-collects full frame before extracting
raise # bad pipelined frame so abort by raising error
yield
def _sadPathSigGroup(self, ctr, ims, root=None, cold=Colds.txt, pipelined=False):
"""
Args:
ctr (Counter): group type counter
ims (bytearray) of serialized incoming message stream.
May contain one or more sets each of a serialized message with
attached cryptographic material such as signatures or receipts.
root (Pather) optional root path of this group
cold (str): next charater Coldage type indicayor
pipelined (bool) True means use pipeline processor to process
ims msgs when stream includes pipelined count codes.
Returns:
"""
if ctr.code != CtrDex.SadPathSig:
raise kering.UnexpectedCountCodeError("Wrong "
"count code={}.Expected code={}."
"".format(ctr.code, CtrDex.ControllerIdxSigs))
subpath = yield from self._extractor(ims,
klas=Pather,
cold=cold,
abort=pipelined)
if root is not None:
subpath = subpath.root(root=root)
sctr = yield from self._extractor(ims=ims,
klas=Counter,
cold=cold,
abort=pipelined)
if sctr.code == CtrDex.TransIdxSigGroups:
for prefixer, seqner, saider, isigers in self._transIdxSigGroups(sctr, ims, cold=cold, pipelined=pipelined):
yield sctr.code, (subpath, prefixer, seqner, saider, isigers)
elif sctr.code == CtrDex.ControllerIdxSigs:
isigers = []
for i in range(sctr.count): # extract each attached signature
isiger = yield from self._extractor(ims=ims,
klas=Siger,
cold=cold,
abort=pipelined)
isigers.append(isiger)
yield sctr.code, (subpath, isigers)
elif sctr.code == CtrDex.NonTransReceiptCouples:
for cigar in self._nonTransReceiptCouples(ctr=sctr, ims=ims, cold=cold, pipelined=pipelined):
yield sctr.code, (subpath, cigar)
else:
raise kering.UnexpectedCountCodeError("Wrong "
"count code={}.Expected code={}."
"".format(ctr.code, CtrDex.ControllerIdxSigs))
def _transIdxSigGroups(self, ctr, ims, cold=Colds.txt, pipelined=False):
"""
Extract attaced trans indexed sig groups each made of
triple pre+snu+dig plus indexed sig group
pre is pre of signer (endorser) of msg
snu is sn of signer's est evt when signed
dig is dig of signer's est event when signed
followed by counter for ControllerIdxSigs with attached
indexed sigs from trans signer (endorser).
Parameters:
ctr (Counter): group type counter
ims (bytearray) of serialized incoming message stream.
May contain one or more sets each of a serialized message with
attached cryptographic material such as signatures or receipts.
cold (str): next charater Coldage type indicayor
pipelined (bool) True means use pipeline processor to process
ims msgs when stream includes pipelined count codes.
Yields:
"""
for i in range(ctr.count): # extract each attached groups
prefixer = yield from self._extractor(ims,
klas=Prefixer,
cold=cold,
abort=pipelined)
seqner = yield from self._extractor(ims,
klas=Seqner,
cold=cold,
abort=pipelined)
saider = yield from self._extractor(ims,
klas=Saider,
cold=cold,
abort=pipelined)
ictr = yield from self._extractor(ims=ims,
klas=Counter,
cold=cold,
abort=pipelined)
if ictr.code != CtrDex.ControllerIdxSigs:
raise kering.UnexpectedCountCodeError("Wrong "
"count code={}.Expected code={}."
"".format(ictr.code, CtrDex.ControllerIdxSigs))
isigers = []
for i in range(ictr.count): # extract each attached signature
isiger = yield from self._extractor(ims=ims,
klas=Siger,
cold=cold,
abort=pipelined)
isigers.append(isiger)
yield prefixer, seqner, saider, isigers
def _nonTransReceiptCouples(self, ctr, ims, cold=Colds.txt, pipelined=False):
"""
Extract attached rct couplets into list of sigvers
verfer property of cigar is the identifier prefix
cigar itself has the attached signature
Parameters:
ctr (Counter): group type counter
ims (bytearray) of serialized incoming message stream.
May contain one or more sets each of a serialized message with
attached cryptographic material such as signatures or receipts.
cold (str): next charater Coldage type indicayor
pipelined (bool) True means use pipeline processor to process
ims msgs when stream includes pipelined count codes.
Yields:
"""
for i in range(ctr.count): # extract each attached couple
verfer = yield from self._extractor(ims=ims,
klas=Verfer,
cold=cold,
abort=pipelined)
cigar = yield from self._extractor(ims=ims,
klas=Cigar,
cold=cold,
abort=pipelined)
cigar.verfer = verfer
yield cigar
[docs]
def parse(self, ims=None, framed=None, pipeline=None, kvy=None, tvy=None, exc=None, rvy=None, vry=None):
"""
Processes all messages from incoming message stream, ims,
when provided. Otherwise process messages from .ims
Returns when ims is empty.
Convenience executor for .processAllGen when ims is not live, i.e. fixed
Parameters:
ims is bytearray of incoming message stream. May contain one or more
sets each of a serialized message with attached cryptographic
material such as signatures or receipts.
framed is Boolean, True means ims contains only one frame of msg plus
counted attachments instead of stream with multiple messages
pipeline is Boolean, True means use pipeline processor to process
ims msgs when stream incpyludes pipelined count codes.
kvy (Kevery): route KERI KEL message types to this instance
tvy (Tevery): route TEL message types to this instance
exc (Exchanger) route EXN message types to this instance
rvy (Revery): reply (RPY) message handler
vry (Verfifier): credential verifier with wallet storage
New Logic:
Attachments must all have counters so know if txt or bny format for
attachments. So even when framed==True must still have counters.
"""
parsator = self.allParsator(ims=ims,
framed=framed,
pipeline=pipeline,
kvy=kvy,
tvy=tvy,
exc=exc,
rvy=rvy,
vry=vry)
while True:
try:
next(parsator)
except StopIteration:
break
[docs]
def parseOne(self, ims=None, framed=True, pipeline=False, kvy=None, tvy=None, exc=None, rvy=None, vry=None):
"""
Processes one messages from incoming message stream, ims,
when provided. Otherwise process message from .ims
Returns once one message is processed.
Convenience executor for .processOneGen when ims is not live, i.e. fixed
Parameters:
ims is bytearray of serialized incoming message stream.
May contain one or more sets each of a serialized message with
attached cryptographic material such as signatures or receipts.
framed is Boolean, True means ims contains only one frame of msg plus
counted attachments instead of stream with multiple messages
pipeline is Boolean, True means use pipeline processor to process
ims msgs when stream includes pipelined count codes.
kvy (Kevery): route KERI KEL message types to this instance
tvy (Tevery): route TEL message types to this instance
exc (Exchanger) route EXN message types to this instance
rvy (Revery): reply (RPY) message handler
New Logic:
Attachments must all have counters so know if txt or bny format for
attachments. So even when framed==True must still have counters.
"""
parsator = self.onceParsator(ims=ims,
framed=framed,
pipeline=pipeline,
kvy=kvy,
tvy=tvy,
exc=exc,
rvy=rvy,
vry=vry)
while True:
try:
next(parsator)
except StopIteration:
break
[docs]
def allParsator(self, ims=None, framed=None, pipeline=None, kvy=None, tvy=None, exc=None, rvy=None, vry=None):
"""
Returns generator to parse all messages from incoming message stream,
ims until ims is exhausted (empty) then returns.
Generator completes as soon as ims is empty.
If ims not provided then parse messages from .ims
Parameters:
ims is bytearray of incoming message stream. May contain one or more
sets each of a serialized message with attached cryptographic
material such as signatures or receipts.
framed is Boolean, True means ims contains only one frame of msg plus
counted attachments instead of stream with multiple messages
pipeline is Boolean, True means use pipeline processor to process
ims msgs when stream includes pipelined count codes.
kvy (Kevery): route KERI KEL message types to this instance
tvy (Tevery): route TEL message types to this instance
exc (Exchanger) route EXN message types to this instance
rvy (Revery): reply (RPY) message handler
vry (Verfifier): credential verifier with wallet storage
New Logic:
Attachments must all have counters so know if txt or bny format for
attachments. So even when framed==True must still have counters.
"""
if ims is not None: # needs bytearray not bytes since deletes as processes
if not isinstance(ims, bytearray):
ims = bytearray(ims) # so make bytearray copy
else:
ims = self.ims # use instance attribute by default
framed = framed if framed is not None else self.framed
pipeline = pipeline if pipeline is not None else self.pipeline
kvy = kvy if kvy is not None else self.kvy
tvy = tvy if tvy is not None else self.tvy
exc = exc if exc is not None else self.exc
rvy = rvy if rvy is not None else self.rvy
vry = vry if vry is not None else self.vry
while ims: # only process until ims empty
try:
done = yield from self.msgParsator(ims=ims,
framed=framed,
pipeline=pipeline,
kvy=kvy,
tvy=tvy,
exc=exc,
rvy=rvy,
vry=vry)
except kering.SizedGroupError as ex: # error inside sized group
# processOneIter already flushed group so do not flush stream
if logger.isEnabledFor(logging.ERROR):
logger.exception("Parser msg extraction error: %s\n", ex.args[0])
else:
logger.error("Parser msg extraction error: %s\n", ex.args[0])
except (kering.ColdStartError, kering.ExtractionError) as ex: # some extraction error
if logger.isEnabledFor(logging.ERROR):
logger.exception("Parser msg extraction error: %s\n", ex.args[0])
else:
logger.error("Parser msg extraction error: %s\n", ex.args[0])
del ims[:] # delete rest of stream to force cold restart
except (kering.ValidationError, Exception) as ex: # non Extraction Error
# Non extraction errors happen after successfully extracted from stream
# so we don't flush rest of stream just resume
if logger.isEnabledFor(logging.ERROR):
logger.exception("Parser msg non-extraction error: %s\n", ex)
else:
logger.error("Parser msg non-extraction error: %s\n", ex)
yield
return True
[docs]
def onceParsator(self, ims=None, framed=None, pipeline=None, kvy=None, tvy=None, exc=None, rvy=None, vry=None):
"""
Returns generator to parse one message from incoming message stream, ims.
If ims not provided parse messages from .ims
Parameters:
ims is bytearray of incoming message stream. May contain one or more
sets each of a serialized message with attached cryptographic
material such as signatures or receipts.
framed is Boolean, True means ims contains only one frame of msg plus
counted attachments instead of stream with multiple messages
pipeline is Boolean, True means use pipeline processor to process
ims msgs when stream includes pipelined count codes.
kvy (Kevery): route KERI KEL message types to this instance
tvy (Tevery): route TEL message types to this instance
exc (Exchanger) route EXN message types to this instance
rvy (Revery): reply (RPY) message handler
vry (Verfifier): credential verifier with wallet storage
New Logic:
Attachments must all have counters so know if txt or bny format for
attachments. So even when framed==True must still have counters.
"""
if ims is not None: # needs bytearray not bytes since deletes as processes
if not isinstance(ims, bytearray):
ims = bytearray(ims) # so make bytearray copy
else:
ims = self.ims # use instance attribute by default
framed = framed if framed is not None else self.framed
pipeline = pipeline if pipeline is not None else self.pipeline
kvy = kvy if kvy is not None else self.kvy
tvy = tvy if tvy is not None else self.tvy
exc = exc if exc is not None else self.exc
rvy = rvy if rvy is not None else self.rvy
vry = vry if vry is not None else self.vry
done = False
while not done:
try:
done = yield from self.msgParsator(ims=ims,
framed=framed,
pipeline=pipeline,
kvy=kvy,
tvy=tvy,
exc=exc,
rvy=rvy,
vry=vry)
except kering.SizedGroupError as ex: # error inside sized group
# processOneIter already flushed group so do not flush stream
if logger.isEnabledFor(logging.ERROR):
logger.exception("Kevery msg extraction error: %s\n", ex.args[0])
else:
logger.error("Kevery msg extraction error: %s\n", ex.args[0])
except (kering.ColdStartError, kering.ExtractionError) as ex: # some extraction error
if logger.isEnabledFor(logging.ERROR):
logger.exception("Kevery msg extraction error: %s\n", ex.args[0])
else:
logger.error("Kevery msg extraction error: %s\n", ex.args[0])
del ims[:] # delete rest of stream to force cold restart
except (kering.ValidationError, Exception) as ex: # non Extraction Error
# Non extraction errors happen after successfully extracted from stream
# so we don't flush rest of stream just resume
if logger.isEnabledFor(logging.ERROR):
logger.exception("Kevery msg non-extraction error: %s\n", ex.args[0])
else:
logger.error("Kevery msg non-extraction error: %s\n", ex.args[0])
finally:
done = True
return done
[docs]
def parsator(self, ims=None, framed=None, pipeline=None, kvy=None, tvy=None, exc=None, rvy=None, vry=None):
"""
Returns generator to continually parse messages from incoming message
stream, ims. Empty yields when ims is emply.
Useful for always running servers.
One yield from per each message if any.
Continually yields while ims is empty.
If ims not provided then parse messages from .ims
Parameters:
ims is bytearray of incoming message stream. May contain one or more
sets each of a serialized message with attached cryptographic
material such as signatures or receipts.
framed is Boolean, True means ims contains only one frame of msg plus
counted attachments instead of stream with multiple messages
pipeline is Boolean, True means use pipeline processor to process
ims msgs when stream includes pipelined count codes.
kvy (Kevery): route KERI KEL message types to this instance
tvy (Tevery): route TEL message types to this instance
exc (Exchanger) route EXN message types to this instance
rvy (Revery): reply (RPY) message handler
vry (Verifier): credential processor
New Logic:
Attachments must all have counters so know if txt or bny format for
attachments. So even when framed==True must still have counters.
"""
if ims is not None: # needs bytearray not bytes since deletes as processes
if not isinstance(ims, bytearray):
ims = bytearray(ims) # so make bytearray copy
else:
ims = self.ims # use instance attribute by default
framed = framed if framed is not None else self.framed
pipeline = pipeline if pipeline is not None else self.pipeline
kvy = kvy if kvy is not None else self.kvy
tvy = tvy if tvy is not None else self.tvy
exc = exc if exc is not None else self.exc
rvy = rvy if rvy is not None else self.rvy
vry = vry if vry is not None else self.vry
while True: # continuous stream processing never stop
try:
done = yield from self.msgParsator(ims=ims,
framed=framed,
pipeline=pipeline,
kvy=kvy,
tvy=tvy,
exc=exc,
rvy=rvy,
vry=vry)
except kering.SizedGroupError as ex: # error inside sized group
# processOneIter already flushed group so do not flush stream
if logger.isEnabledFor(logging.ERROR):
logger.exception("Parser msg extraction error: %s\n", ex.args[0])
else:
logger.error("Parser msg extraction error: %s\n", ex.args[0])
except (kering.ColdStartError, kering.ExtractionError) as ex: # some extraction error
if logger.isEnabledFor(logging.ERROR):
logger.exception("Parser msg extraction error: %s\n", ex.args[0])
else:
logger.error("Parser msg extraction error: %s\n", ex.args[0])
del ims[:] # delete rest of stream to force cold restart
except (kering.ValidationError, Exception) as ex: # non Extraction Error
# Non extraction errors happen after successfully extracted from stream
# so we don't flush rest of stream just resume
if logger.isEnabledFor(logging.ERROR):
logger.exception("Parser msg non-extraction error: %s\n", ex.args[0])
else:
logger.error("Parser msg non-extraction error: %s\n", ex.args[0])
yield
return True # should never return
[docs]
def msgParsator(self, ims=None, framed=True, pipeline=False,
kvy=None, tvy=None, exc=None, rvy=None, vry=None):
"""
Returns generator that upon each iteration extracts and parses msg
with attached crypto material (signature etc) from incoming message
stream, ims, and dispatches processing of message with attachments.
Uses .ims when ims is not provided.
Iterator yields when not enough bytes in ims to finish one msg plus
attachments. Returns (which raises StopIteration) when finished.
Parameters:
ims (bytearray) of serialized incoming message stream.
May contain one or more sets each of a serialized message with
attached cryptographic material such as signatures or receipts.
framed (bool) True means ims contains only one frame of msg plus
counted attachments instead of stream with multiple messages
pipeline (bool) True means use pipeline processor to process
ims msgs when stream includes pipelined count codes.
kvy (Kevery) route KERI KEL message types to this instance
tvy (Tevery) route TEL message types to this instance
exc (Exchanger) route EXN message types to this instance
rvy (Revery): reply (RPY) message handler
vry (Verifier) ACDC credential processor
Logic:
Currently only support couters on attachments not on combined or
on message
Attachments must all have counters so know if txt or bny format for
attachments. So even when framed==True must still have counter.
Do While loop
sniff to set up first extraction
raise exception and flush full tream if stream start is counter
must be message
extract message
sniff for counter
if group counter extract and discard but keep track of count
so if error while processing attachments then only need to flush
attachment count not full stream.
"""
serdery = serdering.Serdery(version=kering.Version)
if ims is None:
ims = self.ims
while not ims:
yield
cold = self.sniff(ims) # check for spurious counters at front of stream
if cold in (Colds.txt, Colds.bny): # not message error out to flush stream
# replace with pipelining here once CESR message format supported.
raise kering.ColdStartError("Expecting message counter tritet={}"
"".format(cold))
# Otherwise its a message cold start
while True: # extract, deserialize, and strip message from ims
try:
serder = serdery.reap(ims=ims) # can set version here
except kering.ShortageError as ex: # need more bytes
yield
else: # extracted and stripped successfully
break # break out of while loop
#while True: # extract and deserialize message from ims
#try:
#sadder = Sadder(raw=ims)
#except kering.ShortageError as ex: # need more bytes
#yield
#else: # extracted successfully
#del ims[:sadder.size] # strip off event from front of ims
#break
sigers = [] # list of Siger instances of attached indexed controller signatures
wigers = [] # list of Siger instance of attached indexed witness signatures
cigars = [] # List of cigars to hold nontrans rct couplets
# List of tuples from extracted transferable receipt (vrc) quadruples
trqs = [] # each converted quadruple is (prefixer, seqner, diger, siger)
# List of tuples from extracted transferable indexed sig groups
tsgs = [] # each converted group is tuple of (i,s,d) triple plus list of sigs
# List of tuples from extracted signer seals sig groups
ssgs = [] # each converted group is the identifier prefix plus list of sigs
# List of tuples from extracted first seen replay couples
frcs = [] # each converted couple is (seqner, dater)
# List of tuples from extracted source seal couples (delegator or issuer)
sscs = [] # each converted couple is (seqner, diger) for delegating or issuing event
# List of tuples from extracted source seal triples (issuer or issuance tel event)
ssts = [] # each converted couple is (seqner, diger) for delegating or issuing event
# List of tuples from extracted SAD path sig groups from transferable identifiers
sadtsgs = [] # each converted group is tuple of (path, i, s, d) quad plus list of sigs
# List of tuples from extracted SAD path sig groups from non-trans identifiers
sadcigs = [] # each converted group is path plus list of non-trans sigs
pathed = [] # grouped attachments targetting a subpath
pipelined = False # all attachments in one big pipeline counted group
# extract and deserialize attachments
try: # catch errors here to flush only counted part of stream
# extract attachments must start with counter so know if txt or bny.
while not ims:
yield
cold = self.sniff(ims) # expect counter at front of attachments
if cold != Colds.msg: # not new message so process attachments
ctr = yield from self._extractor(ims=ims, klas=Counter, cold=cold)
if ctr.code == CtrDex.AttachedMaterialQuadlets: # pipeline ctr?
pipelined = True
# compute pipelined attached group size based on txt or bny
pags = ctr.count * 4 if cold == Colds.txt else ctr.count * 3
while len(ims) < pags: # wait until rx full pipelned group
yield
pims = ims[:pags] # copy out substream pipeline group
del ims[:pags] # strip off from ims
ims = pims # now just process substream as one counted frame
if pipeline:
pass # pass extracted ims to pipeline processor
return
ctr = yield from self._extractor(ims=ims,
klas=Counter,
cold=cold,
abort=pipelined)
# iteratively process attachment counters (all non pipelined)
while True: # do while already extracted first counter is ctr
if ctr.code == CtrDex.ControllerIdxSigs:
for i in range(ctr.count): # extract each attached signature
siger = yield from self._extractor(ims=ims,
klas=Siger,
cold=cold,
abort=pipelined)
sigers.append(siger)
elif ctr.code == CtrDex.WitnessIdxSigs:
for i in range(ctr.count): # extract each attached signature
wiger = yield from self._extractor(ims=ims,
klas=Siger,
cold=cold,
abort=pipelined)
wigers.append(wiger)
elif ctr.code == CtrDex.NonTransReceiptCouples:
# extract attached rct couplets into list of sigvers
# verfer property of cigar is the identifier prefix
# cigar itself has the attached signature
for cigar in self._nonTransReceiptCouples(ctr=ctr,
ims=ims,
cold=cold,
pipelined=pipelined):
cigars.append(cigar)
elif ctr.code == CtrDex.TransReceiptQuadruples:
# extract attaced trans receipt vrc quadruple
# spre+ssnu+sdig+sig
# spre is pre of signer of vrc
# ssnu is sn of signer's est evt when signed
# sdig is dig of signer's est event when signed
# sig is indexed signature of signer on this event msg
for i in range(ctr.count): # extract each attached quadruple
prefixer = yield from self._extractor(ims,
klas=Prefixer,
cold=cold,
abort=pipelined)
seqner = yield from self._extractor(ims,
klas=Seqner,
cold=cold,
abort=pipelined)
saider = yield from self._extractor(ims,
klas=Saider,
cold=cold,
abort=pipelined)
siger = yield from self._extractor(ims=ims,
klas=Siger,
cold=cold,
abort=pipelined)
trqs.append((prefixer, seqner, saider, siger))
elif ctr.code == CtrDex.TransIdxSigGroups:
# extract attaced trans indexed sig groups each made of
# triple pre+snu+dig plus indexed sig group
# pre is pre of signer (endorser) of msg
# snu is sn of signer's est evt when signed
# dig is dig of signer's est event when signed
# followed by counter for ControllerIdxSigs with attached
# indexed sigs from trans signer (endorser).
for (prefixer, seqner, saider, isigers) in \
self._transIdxSigGroups(ctr, ims, cold=cold,
pipelined=pipelined):
tsgs.append((prefixer, seqner, saider, isigers))
elif ctr.code == CtrDex.TransLastIdxSigGroups:
# extract attaced signer seal indexed sig groups each made of
# identifier pre plus indexed sig group
# pre is pre of signer (endorser) of msg
# followed by counter for ControllerIdxSigs with attached
# indexed sigs from trans signer (endorser).
for i in range(ctr.count): # extract each attached groups
prefixer = yield from self._extractor(ims,
klas=Prefixer,
cold=cold,
abort=pipelined)
ictr = yield from self._extractor(ims=ims,
klas=Counter,
cold=cold,
abort=pipelined)
if ictr.code != CtrDex.ControllerIdxSigs:
raise kering.UnexpectedCountCodeError("Wrong "
"count code={}.Expected code={}."
"".format(ictr.code, CtrDex.ControllerIdxSigs))
isigers = []
for i in range(ictr.count): # extract each attached signature
isiger = yield from self._extractor(ims=ims,
klas=Siger,
cold=cold,
abort=pipelined)
isigers.append(isiger)
ssgs.append((prefixer, isigers))
elif ctr.code == CtrDex.FirstSeenReplayCouples:
# extract attached first seen replay couples
# snu+dtm
# snu is fn (first seen ordinal) of event
# dtm is dt of event
for i in range(ctr.count): # extract each attached quadruple
firner = yield from self._extractor(ims,
klas=Seqner,
cold=cold,
abort=pipelined)
dater = yield from self._extractor(ims,
klas=Dater,
cold=cold,
abort=pipelined)
frcs.append((firner, dater))
elif ctr.code == CtrDex.SealSourceCouples:
# extract attached first seen replay couples
# snu+dig
# snu is sequence number of event
# dig is digest of event
for i in range(ctr.count): # extract each attached quadruple
seqner = yield from self._extractor(ims,
klas=Seqner,
cold=cold,
abort=pipelined)
saider = yield from self._extractor(ims,
klas=Saider,
cold=cold,
abort=pipelined)
sscs.append((seqner, saider))
elif ctr.code == CtrDex.SealSourceTriples:
# extract attached anchoring source event information
# pre+snu+dig
# pre is prefix of event
# snu is sequence number of event
# dig is digest of event
for i in range(ctr.count): # extract each attached quadruple
prefixer = yield from self._extractor(ims,
klas=Prefixer,
cold=cold,
abort=pipelined)
seqner = yield from self._extractor(ims,
klas=Seqner,
cold=cold,
abort=pipelined)
saider = yield from self._extractor(ims,
klas=Saider,
cold=cold,
abort=pipelined)
ssts.append((prefixer, seqner, saider))
elif ctr.code == CtrDex.SadPathSigGroup:
path = yield from self._extractor(ims,
klas=Pather,
cold=cold,
abort=pipelined)
for i in range(ctr.count):
ictr = yield from self._extractor(ims=ims,
klas=Counter,
cold=cold,
abort=pipelined)
for code, sigs in self._sadPathSigGroup(ctr=ictr,
ims=ims,
root=path,
cold=cold,
pipelined=pipelined):
if code == CtrDex.TransIdxSigGroups:
sadtsgs.append(sigs)
else:
sadcigs.append(sigs)
elif ctr.code == CtrDex.SadPathSig:
for code, sigs in self._sadPathSigGroup(ctr=ctr,
ims=ims,
cold=cold,
pipelined=pipelined):
if code == CtrDex.TransIdxSigGroups:
sadtsgs.append(sigs)
else:
sadcigs.append(sigs)
elif ctr.code == CtrDex.PathedMaterialQuadlets: # pathed ctr?
# compute pipelined attached group size based on txt or bny
pags = ctr.count * 4 if cold == Colds.txt else ctr.count * 3
while len(ims) < pags: # wait until rx full pipelned group
yield
pims = ims[:pags] # copy out substream pipeline group
del ims[:pags] # strip off from ims
pathed.append(pims)
else:
raise kering.UnexpectedCountCodeError("Unsupported count"
" code={}.".format(ctr.code))
if pipelined: # process to end of stream (group)
if not ims: # end of pipelined group frame
break
elif framed:
# because not all in one pipeline group, each attachment
# group may switch stream state txt or bny
if not ims: # end of frame
break
cold = self.sniff(ims)
if cold == Colds.msg: # new message so attachments done
break # finished attachments since new message
else: # process until next message
# because not all in one pipeline group, each attachment
# group may switch stream state txt or bny
while not ims:
yield # no frame so must wait for next message
cold = self.sniff(ims) # ctr or msg
if cold == Colds.msg: # new message
break # finished attachments since new message
ctr = yield from self._extractor(ims=ims, klas=Counter, cold=cold)
except kering.ExtractionError as ex:
if pipelined: # extracted pipelined group is preflushed
raise kering.SizedGroupError("Error processing pipelined size"
"attachment group of size={}.".format(pags))
raise # no pipeline group so can't preflush, must flush stream
if isinstance(serder, serdering.SerderKERI):
ilk = serder.ilk # dispatch abased on ilk
#if sadder.proto == Protos.keri:
#serder = Serder(sad=sadder)
#ilk = serder.ked["t"] # dispatch abased on ilk
if ilk in [Ilks.icp, Ilks.rot, Ilks.ixn, Ilks.dip, Ilks.drt]: # event msg
firner, dater = frcs[-1] if frcs else (None, None) # use last one if more than one
# when present assumes this is source seal of delegating event in delegator's KEL
delseqner, delsaider = sscs[-1] if sscs else (None, None) # use last one if more than one
if not sigers:
raise kering.ValidationError("Missing attached signature(s) for evt "
"= {}.".format(serder.ked))
try:
kvy.processEvent(serder=serder,
sigers=sigers,
wigers=wigers,
delseqner=delseqner,
delsaider=delsaider,
firner=firner,
dater=dater)
if cigars:
kvy.processReceiptCouples(serder, cigars, firner=firner)
if trqs:
kvy.processReceiptQuadruples(serder, trqs, firner=firner)
except AttributeError as ex:
raise kering.ValidationError("No kevery to process so dropped msg"
"= {}.".format(serder.pretty())) from ex
elif ilk in [Ilks.rct]: # event receipt msg (nontransferable)
if not (cigars or wigers or tsgs):
raise kering.ValidationError("Missing attached signatures on receipt"
"msg = {}.".format(serder.ked))
try:
if cigars:
kvy.processReceipt(serder=serder, cigars=cigars)
if wigers:
kvy.processReceiptWitness(serder=serder, wigers=wigers)
if tsgs:
kvy.processReceiptTrans(serder=serder, tsgs=tsgs)
except AttributeError:
raise kering.ValidationError("No kevery to process so dropped msg"
"= {}.".format(serder.pretty()))
elif ilk in (Ilks.rpy,): # reply message
if not (cigars or tsgs):
raise kering.ValidationError("Missing attached endorser signature(s) "
"to reply msg = {}.".format(serder.pretty()))
try:
if cigars: # process separately so do not clash on errors
rvy.processReply(serder, cigars=cigars) # nontrans
if tsgs: # process separately so do not clash on errors
rvy.processReply(serder, tsgs=tsgs) # trans
except AttributeError as e:
raise kering.ValidationError("No kevery to process so dropped msg"
"= {}.".format(serder.pretty()))
elif ilk in (Ilks.qry,): # query message
args = dict(serder=serder)
if ssgs:
pre, sigers = ssgs[-1] if ssgs else (None, None) # use last one if more than one
args["source"] = pre
args["sigers"] = sigers
elif cigars:
args["cigars"] = cigars
else:
raise kering.ValidationError("Missing attached requester signature(s) "
"to key log query msg = {}.".format(serder.pretty()))
route = serder.ked["r"]
if route in ["logs", "ksn", "mbx"]:
try:
kvy.processQuery(**args)
except AttributeError:
raise kering.ValidationError("No kevery to process so dropped msg"
"= {}.".format(serder.pretty()))
elif route in ["tels", "tsn"]:
try:
tvy.processQuery(**args)
except AttributeError as e:
raise kering.ValidationError("No tevery to process so dropped msg"
"= {} from {}.".format(serder.pretty(), e))
else:
raise kering.ValidationError("Invalid resource type {} so dropped msg"
"= {}.".format(route, serder.pretty()))
elif ilk in (Ilks.exn,):
args = dict(serder=serder)
if pathed:
args["pathed"] = pathed
try:
if cigars:
exc.processEvent(cigars=cigars, **args)
if tsgs:
exc.processEvent(tsgs=tsgs, **args)
except AttributeError:
raise kering.ValidationError("No Exchange to process so dropped msg"
"= {}.".format(serder.pretty()))
elif ilk in (Ilks.vcp, Ilks.vrt, Ilks.iss, Ilks.rev, Ilks.bis, Ilks.brv):
# TEL msg
# this transaction event seal in Issuer's KEL (controller of Issuer AID)
seqner, saider = sscs[-1] if sscs else (None, None) # use last one if more than one
try:
tvy.processEvent(serder=serder, seqner=seqner, saider=saider, wigers=wigers)
except AttributeError as e:
raise kering.ValidationError("No tevery to process so dropped msg"
"= {}.".format(serder.pretty()))
else:
raise kering.ValidationError("Unexpected message ilk = {} for evt ="
" {}.".format(ilk, serder.pretty()))
elif isinstance(serder, serdering.SerderACDC):
ilk = serder.ilk # dispatch based on ilk
if ilk is None: # default for ACDC
try:
prefixer, seqner, saider = ssts[-1] if ssts else (None, None, None) # use last one if more than one
vry.processCredential(creder=serder, prefixer=prefixer, seqner=seqner, saider=saider)
except AttributeError as e:
raise kering.ValidationError("No verifier to process so dropped credential"
"= {}.".format(serder.pretty()))
else:
raise kering.ValidationError("Unexpected message ilk = {} for evt ="
" {}.".format(ilk, serder.pretty()))
else:
raise kering.ValidationError("Unexpected protocol type = {} for event message ="
" {}.".format(serder.proto, serder.pretty()))
return True # done state