Source code for keri.core.coring

# -*- encoding: utf-8 -*-
"""
keri.core.coring module

"""
import re
import json
from typing import Union
from collections.abc import Iterable

from dataclasses import dataclass, astuple
from collections import namedtuple, deque
from base64 import urlsafe_b64encode as encodeB64
from base64 import urlsafe_b64decode as decodeB64
from fractions import Fraction

import cbor2 as cbor
import msgpack
import pysodium
import blake3
import hashlib

from cryptography import exceptions
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.hazmat.primitives.asymmetric import ec, utils

from ..kering import (EmptyMaterialError, RawMaterialError, InvalidCodeError,
                      InvalidCodeSizeError, InvalidVarIndexError,
                      InvalidVarSizeError, InvalidVarRawSizeError,
                      ConversionError, InvalidValueError, InvalidTypeError,
                      ValidationError, VersionError, DerivationError,
                      EmptyListError,
                      ShortageError, UnexpectedCodeError, DeserializeError,
                      UnexpectedCountCodeError, UnexpectedOpCodeError)
from ..kering import (Versionage, Version, VERRAWSIZE, VERFMT, VERFULLSIZE,
                      versify, deversify, Rever)
from ..kering import Serials, Serialage, Protos, Protocolage, Ilkage, Ilks
from ..kering import (ICP_LABELS, DIP_LABELS, ROT_LABELS, DRT_LABELS, IXN_LABELS,
                      RPY_LABELS)
from ..kering import (VCP_LABELS, VRT_LABELS, ISS_LABELS, BIS_LABELS, REV_LABELS,
                      BRV_LABELS, TSN_LABELS, CRED_TSN_LABELS)

from ..help import helping
from ..help.helping import sceil, nonStringIterable


Labels = Ilkage(icp=ICP_LABELS, rot=ROT_LABELS, ixn=IXN_LABELS, dip=DIP_LABELS,
                drt=DRT_LABELS, rct=[], qry=[], rpy=RPY_LABELS,
                exn=[], pro=[], bar=[],
                vcp=VCP_LABELS, vrt=VRT_LABELS, iss=ISS_LABELS, rev=REV_LABELS,
                bis=BIS_LABELS, brv=BRV_LABELS)


DSS_SIG_MODE = "fips-186-3"
ECDSA_256r1_SEEDBYTES = 32
ECDSA_256k1_SEEDBYTES = 32


Vstrings = Serialage(json=versify(kind=Serials.json, size=0),
                     mgpk=versify(kind=Serials.mgpk, size=0),
                     cbor=versify(kind=Serials.cbor, size=0))

# SAID field labels
Saidage = namedtuple("Saidage", "dollar at id_ i d")

Saids = Saidage(dollar="$id", at="@id", id_="id", i="i", d="d")

[docs] def sizeify(ked, kind=None, version=Version): """ Compute serialized size of ked and update version field Returns tuple of associated values extracted and or changed by sizeify Returns tuple of (raw, proto, kind, ked, version) where: raw (str): serialized event as bytes of kind proto (str): protocol type as value of Protocolage kind (str): serialzation kind as value of Serialage ked (dict): key event dict version (Versionage): instance Parameters: ked (dict): key event dict kind (str): value of Serials is serialization type if not provided use that given in ked["v"] version (Versionage): instance supported protocol version for message Assumes only supports Version """ if "v" not in ked: raise ValueError("Missing or empty version string in key event " "dict = {}".format(ked)) proto, vrsn, knd, size = deversify(ked["v"]) # extract kind and version if vrsn != version: raise ValueError("Unsupported version = {}.{}".format(vrsn.major, vrsn.minor)) if not kind: kind = knd if kind not in Serials: raise ValueError("Invalid serialization kind = {}".format(kind)) raw = dumps(ked, kind) size = len(raw) match = Rever.search(raw) # Rever's regex takes bytes if not match or match.start() > 12: raise ValueError("Invalid version string in raw = {}".format(raw)) fore, back = match.span() # full version string # update vs with latest kind version size vs = versify(proto=proto, version=vrsn, kind=kind, size=size) # replace old version string in raw with new one raw = b'%b%b%b' % (raw[:fore], vs.encode("utf-8"), raw[back:]) if size != len(raw): # substitution messed up raise ValueError("Malformed version string size = {}".format(vs)) ked["v"] = vs # update ked return raw, proto, kind, ked, vrsn
# Base64 utilities BASE64_PAD = b'=' # Mappings between Base64 Encode Index and Decode Characters # B64ChrByIdx is dict where each key is a B64 index and each value is the B64 char # B64IdxByChr is dict where each key is a B64 char and each value is the B64 index # Map Base64 index to char B64ChrByIdx = dict((index, char) for index, char in enumerate([chr(x) for x in range(65, 91)])) B64ChrByIdx.update([(index + 26, char) for index, char in enumerate([chr(x) for x in range(97, 123)])]) B64ChrByIdx.update([(index + 52, char) for index, char in enumerate([chr(x) for x in range(48, 58)])]) B64ChrByIdx[62] = '-' B64ChrByIdx[63] = '_' # Map char to Base64 index B64IdxByChr = {char: index for index, char in B64ChrByIdx.items()} B64_CHARS = tuple(B64ChrByIdx.values()) # tuple of characters in Base64 B64REX = b'^[A-Za-z0-9\-\_]*\Z' Reb64 = re.compile(B64REX) # compile is faster
[docs] def intToB64(i, l=1): """ Returns conversion of int i to Base64 str l is min number of b64 digits left padded with Base64 0 == "A" char """ d = deque() # deque of characters base64 while l: d.appendleft(B64ChrByIdx[i % 64]) i = i // 64 if not i: break # d.appendleft(B64ChrByIdx[i % 64]) # i = i // 64 for j in range(l - len(d)): # range(x) x <= 0 means do not iterate d.appendleft("A") return ("".join(d))
[docs] def intToB64b(i, l=1): """ Returns conversion of int i to Base64 bytes l is min number of b64 digits left padded with Base64 0 == "A" char """ return (intToB64(i=i, l=l).encode("utf-8"))
[docs] def b64ToInt(s): """ Returns conversion of Base64 str s or bytes to int """ if not s: raise ValueError("Empty string, conversion undefined.") if hasattr(s, 'decode'): s = s.decode("utf-8") i = 0 for e, c in enumerate(reversed(s)): i |= B64IdxByChr[c] << (e * 6) # same as i += B64IdxByChr[c] * (64 ** e) return i
[docs] def codeB64ToB2(s): """ Returns conversion (decode) of Base64 chars to Base2 bytes. Where the number of total bytes returned is equal to the minimun number of octets sufficient to hold the total converted concatenated sextets from s, with one sextet per each Base64 decoded char of s. Assumes no pad chars in s. Sextets are left aligned with pad bits in last (rightmost) byte. This is useful for decoding as bytes, code characters from the front of a Base64 encoded string of characters. """ i = b64ToInt(s) i <<= 2 * (len(s) % 4) # add 2 bits right zero padding for each sextet n = sceil(len(s) * 3 / 4) # compute min number of ocetets to hold all sextets return (i.to_bytes(n, 'big'))
[docs] def codeB2ToB64(b, l): """ Returns conversion (encode) of l Base2 sextets from front of b to Base64 chars. One char for each of l sextets from front (left) of b. This is useful for encoding as code characters, sextets from the front of a Base2 bytes (byte string). Must provide l because of ambiguity between l=3 and l=4. Both require 3 bytes in b. """ if hasattr(b, 'encode'): b = b.encode("utf-8") # convert to bytes n = sceil(l * 3 / 4) # number of bytes needed for l sextets if n > len(b): raise ValueError("Not enough bytes in {} to nab {} sextets.".format(b, l)) i = int.from_bytes(b[:n], 'big') # convert only first n bytes to int # check if prepad bits are zero tbs = 2 * (l % 4) # trailing bit size in bits i >>= tbs # right shift out trailing bits to make right aligned return (intToB64(i, l)) # return as B64
[docs] def nabSextets(b, l): """ Return first l sextets from front (left) of b as bytes (byte string). Length of bytes returned is minimum sufficient to hold all l sextets. Last byte returned is right bit padded with zeros b is bytes or str """ if hasattr(b, 'encode'): b = b.encode("utf-8") # convert to bytes n = sceil(l * 3 / 4) # number of bytes needed for l sextets if n > len(b): raise ValueError("Not enough bytes in {} to nab {} sextets.".format(b, l)) i = int.from_bytes(b[:n], 'big') p = 2 * (l % 4) i >>= p # strip of last bits i <<= p # pad with empty bits return (i.to_bytes(n, 'big'))
MINSNIFFSIZE = 12 + VERFULLSIZE # min bytes in buffer to sniff else need more
[docs] def sniff(raw): """ Returns serialization kind, version and size from serialized event raw by investigating leading bytes that contain version string Parameters: raw is bytes of serialized event """ if len(raw) < MINSNIFFSIZE: raise ShortageError("Need more bytes.") match = Rever.search(raw) # Rever's regex takes bytes if not match or match.start() > 12: raise VersionError("Invalid version string in raw = {}".format(raw)) proto, major, minor, kind, size = match.group("proto", "major", "minor", "kind", "size") version = Versionage(major=int(major, 16), minor=int(minor, 16)) kind = kind.decode("utf-8") proto = proto.decode("utf-8") if kind not in Serials: raise DeserializeError("Invalid serialization kind = {}".format(kind)) size = int(size, 16) return proto, kind, version, size
[docs] def dumps(ked, kind=Serials.json): """ utility function to handle serialization by kind Returns: raw (bytes): serialized version of ked dict Parameters: ked (Optional(dict, list)): key event dict or message dict to serialize kind (str): serialization kind (JSON, MGPK, CBOR) """ if kind == Serials.json: raw = json.dumps(ked, separators=(",", ":"), ensure_ascii=False).encode("utf-8") elif kind == Serials.mgpk: raw = msgpack.dumps(ked) elif kind == Serials.cbor: raw = cbor.dumps(ked) else: raise ValueError("Invalid serialization kind = {}".format(kind)) return raw
[docs] def loads(raw, size=None, kind=Serials.json): """ utility function to handle deserialization by kind Returns: ked (dict): deserialized Parameters: raw (Union[bytes,bytearray]): raw serialization to deserialze as dict size (int): number of bytes to consume for the deserialization. If None then consume all bytes kind (str): serialization kind (JSON, MGPK, CBOR) """ if kind == Serials.json: try: ked = json.loads(raw[:size].decode("utf-8")) except Exception as ex: raise DeserializeError("Error deserializing JSON: {}" "".format(raw[:size].decode("utf-8"))) elif kind == Serials.mgpk: try: ked = msgpack.loads(raw[:size]) except Exception as ex: raise DeserializeError("Error deserializing MGPK: {}" "".format(raw[:size])) elif kind == Serials.cbor: try: ked = cbor.loads(raw[:size]) except Exception as ex: raise DeserializeError("Error deserializing CBOR: {}" "".format(raw[:size])) else: raise DeserializeError("Invalid deserialization kind: {}" "".format(kind)) return ked
[docs] def generateSigners(salt=None, count=8, transferable=True): """ Returns list of Signers for Ed25519 Parameters: salt is bytes 16 byte long root cryptomatter from which seeds for Signers in list are derived random salt created if not provided count is number of signers in list transferable is boolean true means signer.verfer code is transferable non-transferable otherwise """ if not salt: salt = pysodium.randombytes(pysodium.crypto_pwhash_SALTBYTES) signers = [] for i in range(count): path = f"{i:x}" # algorithm default is argon2id seed = pysodium.crypto_pwhash(outlen=32, passwd=path, salt=salt, opslimit=2, # pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE, memlimit=67108864, # pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE, alg=pysodium.crypto_pwhash_ALG_ARGON2ID13) signers.append(Signer(raw=seed, transferable=transferable)) return signers
[docs] def generatePrivates(salt=None, count=8): """ Returns list of fully qualified Base64 secret Ed25519 seeds i.e private keys Parameters: salt is bytes 16 byte long root cryptomatter from which seeds for Signers in list are derived random salt created if not provided count is number of signers in list """ signers = generateSigners(salt=salt, count=count) return [signer.qb64 for signer in signers] # fetch sigkey as private key
[docs] def generatePublics(salt=None, count=8, transferable=True): """ Returns list of fully qualified Base64 secret seeds for Ed25519 private keys Parameters: salt is bytes 16 byte long root cryptomatter from which seeds for Signers in list are derived random salt created if not provided count is number of signers in list """ signers = generateSigners(salt=salt, count=count, transferable=transferable) return [signer.verfer.qb64 for signer in signers] # fetch verkey as public key
# secret derivation security tier Tierage = namedtuple("Tierage", 'low med high') Tiers = Tierage(low='low', med='med', high='high')
[docs] @dataclass(frozen=True) class MatterCodex: """ MatterCodex is codex code (stable) part of all matter derivation codes. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Ed25519_Seed: str = 'A' # Ed25519 256 bit random seed for private key Ed25519N: str = 'B' # Ed25519 verification key non-transferable, basic derivation. X25519: str = 'C' # X25519 public encryption key, converted from Ed25519 or Ed25519N. Ed25519: str = 'D' # Ed25519 verification key basic derivation Blake3_256: str = 'E' # Blake3 256 bit digest self-addressing derivation. Blake2b_256: str = 'F' # Blake2b 256 bit digest self-addressing derivation. Blake2s_256: str = 'G' # Blake2s 256 bit digest self-addressing derivation. SHA3_256: str = 'H' # SHA3 256 bit digest self-addressing derivation. SHA2_256: str = 'I' # SHA2 256 bit digest self-addressing derivation. ECDSA_256k1_Seed: str = 'J' # ECDSA secp256k1 256 bit random Seed for private key Ed448_Seed: str = 'K' # Ed448 448 bit random Seed for private key X448: str = 'L' # X448 public encryption key, converted from Ed448 Short: str = 'M' # Short 2 byte b2 number Big: str = 'N' # Big 8 byte b2 number X25519_Private: str = 'O' # X25519 private decryption key converted from Ed25519 X25519_Cipher_Seed: str = 'P' # X25519 sealed box 124 char qb64 Cipher of 44 char qb64 Seed ECDSA_256r1_Seed: str = "Q" # ECDSA secp256r1 256 bit random Seed for private key Tall: str = 'R' # Tall 5 byte b2 number Large: str = 'S' # Large 11 byte b2 number Great: str = 'T' # Great 14 byte b2 number Vast: str = 'U' # Vast 17 byte b2 number Label1: str = 'V' # Label1 as one char (bytes) field map label lead size 1 Label2: str = 'W' # Label2 as two char (bytes) field map label lead size 0 Tag3: str = 'X' # Tag3 3 B64 encoded chars for field tag or packet type, semver, trait like 'DND' Tag7: str = 'Y' # Tag7 7 B64 encoded chars for field tag or packet kind and version KERIVVV Salt_128: str = '0A' # 128 bit random salt or 128 bit number (see Huge) Ed25519_Sig: str = '0B' # Ed25519 signature. ECDSA_256k1_Sig: str = '0C' # ECDSA secp256k1 signature. Blake3_512: str = '0D' # Blake3 512 bit digest self-addressing derivation. Blake2b_512: str = '0E' # Blake2b 512 bit digest self-addressing derivation. SHA3_512: str = '0F' # SHA3 512 bit digest self-addressing derivation. SHA2_512: str = '0G' # SHA2 512 bit digest self-addressing derivation. Long: str = '0H' # Long 4 byte b2 number ECDSA_256r1_Sig: str = '0I' # ECDSA secp256r1 signature. Tag1: str = '0J' # Tag1 1 B64 encoded char with pre pad for field tag Tag2: str = '0K' # Tag2 2 B64 encoded chars for field tag or version VV or trait like 'EO' Tag5: str = '0L' # Tag5 5 B64 encoded chars with pre pad for field tag Tag6: str = '0M' # Tag6 6 B64 encoded chars for field tag or protocol kind version like KERIVV (KERI 1.1) or KKKVVV ECDSA_256k1N: str = '1AAA' # ECDSA secp256k1 verification key non-transferable, basic derivation. ECDSA_256k1: str = '1AAB' # ECDSA public verification or encryption key, basic derivation Ed448N: str = '1AAC' # Ed448 non-transferable prefix public signing verification key. Basic derivation. Ed448: str = '1AAD' # Ed448 public signing verification key. Basic derivation. Ed448_Sig: str = '1AAE' # Ed448 signature. Self-signing derivation. Tag4: str = '1AAF' # Tag4 4 B64 encoded chars for field tag or message kind DateTime: str = '1AAG' # Base64 custom encoded 32 char ISO-8601 DateTime X25519_Cipher_Salt: str = '1AAH' # X25519 sealed box 100 char qb64 Cipher of 24 char qb64 Salt ECDSA_256r1N: str = '1AAI' # ECDSA secp256r1 verification key non-transferable, basic derivation. ECDSA_256r1: str = '1AAJ' # ECDSA secp256r1 verification or encryption key, basic derivation Null: str = '1AAK' # Null None or empty value Yes: str = '1AAL' # Yes Truthy Boolean value No: str = '1AAM' # No Falsey Boolean value TBD1: str = '2AAA' # Testing purposes only fixed with lead size 1 TBD2: str = '3AAA' # Testing purposes only of fixed with lead size 2 StrB64_L0: str = '4A' # String Base64 only lead size 0 StrB64_L1: str = '5A' # String Base64 only lead size 1 StrB64_L2: str = '6A' # String Base64 only lead size 2 StrB64_Big_L0: str = '7AAA' # String Base64 only big lead size 0 StrB64_Big_L1: str = '8AAA' # String Base64 only big lead size 1 StrB64_Big_L2: str = '9AAA' # String Base64 only big lead size 2 Bytes_L0: str = '4B' # Byte String lead size 0 Bytes_L1: str = '5B' # Byte String lead size 1 Bytes_L2: str = '6B' # Byte String lead size 2 Bytes_Big_L0: str = '7AAB' # Byte String big lead size 0 Bytes_Big_L1: str = '8AAB' # Byte String big lead size 1 Bytes_Big_L2: str = '9AAB' # Byte String big lead size 2 X25519_Cipher_L0: str = '4C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 0 X25519_Cipher_L1: str = '5C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 1 X25519_Cipher_L2: str = '6C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 2 X25519_Cipher_Big_L0: str = '7AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 0 X25519_Cipher_Big_L1: str = '8AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 1 X25519_Cipher_Big_L2: str = '9AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 2 X25519_Cipher_QB64_L0: str = '4D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 0 X25519_Cipher_QB64_L1: str = '5D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 1 X25519_Cipher_QB64_L2: str = '6D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 2 X25519_Cipher_QB64_Big_L0: str = '7AAD' # X25519 sealed box cipher bytes of QB64 plaintext big lead size 0 X25519_Cipher_QB64_Big_L1: str = '8AAD' # X25519 sealed box cipher bytes of QB64 plaintext big lead size 1 X25519_Cipher_QB64_Big_L2: str = '9AAD' # X25519 sealed box cipher bytes of QB64 plaintext big lead size 2 X25519_Cipher_QB2_L0: str = '4D' # X25519 sealed box cipher bytes of QB2 plaintext lead size 0 X25519_Cipher_QB2_L1: str = '5D' # X25519 sealed box cipher bytes of QB2 plaintext lead size 1 X25519_Cipher_QB2_L2: str = '6D' # X25519 sealed box cipher bytes of QB2 plaintext lead size 2 X25519_Cipher_QB2_Big_L0: str = '7AAD' # X25519 sealed box cipher bytes of QB2 plaintext big lead size 0 X25519_Cipher_QB2_Big_L1: str = '8AAD' # X25519 sealed box cipher bytes of QB2 plaintext big lead size 1 X25519_Cipher_QB2_Big_L2: str = '9AAD' # X25519 sealed box cipher bytes of QB2 plaintext big lead size 2 def __iter__(self): return iter(astuple(self)) # enables inclusion test with "in"
MtrDex = MatterCodex() # Make instance
[docs] @dataclass(frozen=True) class SmallVarRawSizeCodex: """ SmallVarRawSizeCodex is codex all selector characters for the three small variable raw size tables that act as one table but with different leader byte sizes. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Lead0: str = '4' # First Selector Character for all ls == 0 codes Lead1: str = '5' # First Selector Character for all ls == 1 codes Lead2: str = '6' # First Selector Character for all ls == 2 codes def __iter__(self): return iter(astuple(self))
SmallVrzDex = SmallVarRawSizeCodex() # Make instance
[docs] @dataclass(frozen=True) class LargeVarRawSizeCodex: """ LargeVarRawSizeCodex is codex all selector characters for the three large variable raw size tables that act as one table but with different leader byte sizes. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Lead0_Big: str = '7' # First Selector Character for all ls == 0 codes Lead1_Big: str = '8' # First Selector Character for all ls == 1 codes Lead2_Big: str = '9' # First Selector Character for all ls == 2 codes def __iter__(self): return iter(astuple(self))
LargeVrzDex = LargeVarRawSizeCodex() # Make instance
[docs] @dataclass(frozen=True) class NonTransCodex: """ NonTransCodex is codex all non-transferable derivation codes Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Ed25519N: str = 'B' # Ed25519 verification key non-transferable, basic derivation. ECDSA_256k1N: str = '1AAA' # ECDSA secp256k1 verification key non-transferable, basic derivation. Ed448N: str = '1AAC' # Ed448 non-transferable prefix public signing verification key. Basic derivation. ECDSA_256r1N: str = "1AAI" # ECDSA secp256r1 verification key non-transferable, basic derivation. def __iter__(self): return iter(astuple(self))
NonTransDex = NonTransCodex() # Make instance # When add new to DigCodes update Saider.Digests and Serder.Digests class attr
[docs] @dataclass(frozen=True) class DigCodex: """ DigCodex is codex all digest derivation codes. This is needed to ensure delegated inception using a self-addressing derivation i.e. digest derivation code. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Blake3_256: str = 'E' # Blake3 256 bit digest self-addressing derivation. Blake2b_256: str = 'F' # Blake2b 256 bit digest self-addressing derivation. Blake2s_256: str = 'G' # Blake2s 256 bit digest self-addressing derivation. SHA3_256: str = 'H' # SHA3 256 bit digest self-addressing derivation. SHA2_256: str = 'I' # SHA2 256 bit digest self-addressing derivation. Blake3_512: str = '0D' # Blake3 512 bit digest self-addressing derivation. Blake2b_512: str = '0E' # Blake2b 512 bit digest self-addressing derivation. SHA3_512: str = '0F' # SHA3 512 bit digest self-addressing derivation. SHA2_512: str = '0G' # SHA2 512 bit digest self-addressing derivation. def __iter__(self): return iter(astuple(self))
DigDex = DigCodex() # Make instance
[docs] @dataclass(frozen=True) class NumCodex: """ NumCodex is codex of Base64 derivation codes for compactly representing numbers across a wide rage of sizes. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Short: str = 'M' # Short 2 byte b2 number Long: str = '0H' # Long 4 byte b2 number Big: str = 'N' # Big 8 byte b2 number Huge: str = '0A' # Huge 16 byte b2 number (same as Salt_128) def __iter__(self): return iter(astuple(self))
NumDex = NumCodex() # Make instance
[docs] @dataclass(frozen=True) class BextCodex: """ BextCodex is codex of all variable sized Base64 Text (Bext) derivation codes. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ StrB64_L0: str = '4A' # String Base64 Only Leader Size 0 StrB64_L1: str = '5A' # String Base64 Only Leader Size 1 StrB64_L2: str = '6A' # String Base64 Only Leader Size 2 StrB64_Big_L0: str = '7AAA' # String Base64 Only Big Leader Size 0 StrB64_Big_L1: str = '8AAA' # String Base64 Only Big Leader Size 1 StrB64_Big_L2: str = '9AAA' # String Base64 Only Big Leader Size 2 def __iter__(self): return iter(astuple(self))
BexDex = BextCodex() # Make instance
[docs] @dataclass(frozen=True) class TextCodex: """ TextCodex is codex of all variable sized byte string (Text) derivation codes. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Bytes_L0: str = '4B' # Byte String lead size 0 Bytes_L1: str = '5B' # Byte String lead size 1 Bytes_L2: str = '6B' # Byte String lead size 2 Bytes_Big_L0: str = '7AAB' # Byte String big lead size 0 Bytes_Big_L1: str = '8AAB' # Byte String big lead size 1 Bytes_Big_L2: str = '9AAB' # Byte String big lead size 2 def __iter__(self): return iter(astuple(self))
TexDex = TextCodex() # Make instance
[docs] @dataclass(frozen=True) class CipherX25519VarCodex: """ CipherX25519VarCodex is codex all variable sized cipher bytes derivation codes for sealed box encryped ciphertext. Plaintext is B2. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ X25519_Cipher_L0: str = '4D' # X25519 sealed box cipher bytes of sniffable plaintext lead size 0 X25519_Cipher_L1: str = '5D' # X25519 sealed box cipher bytes of sniffable plaintext lead size 1 X25519_Cipher_L2: str = '6D' # X25519 sealed box cipher bytes of sniffable plaintext lead size 2 X25519_Cipher_Big_L0: str = '7AAD' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 0 X25519_Cipher_Big_L1: str = '8AAD' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 1 X25519_Cipher_Big_L2: str = '9AAD' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 2 def __iter__(self): return iter(astuple(self))
CiXVarDex = CipherX25519VarCodex() # Make instance
[docs] @dataclass(frozen=True) class CipherX25519FixQB64Codex: """ CipherX25519FixQB64Codex is codex all fixed sized cipher bytes derivation codes for sealed box encryped ciphertext. Plaintext is B64. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ X25519_Cipher_Seed: str = 'P' # X25519 sealed box 124 char qb64 Cipher of 44 char qb64 Seed X25519_Cipher_Salt: str = '1AAH' # X25519 sealed box 100 char qb64 Cipher of 24 char qb64 Salt def __iter__(self): return iter(astuple(self))
CiXFixQB64Dex = CipherX25519FixQB64Codex() # Make instance
[docs] @dataclass(frozen=True) class CipherX25519VarQB64Codex: """ CipherX25519VarQB64Codex is codex all variable sized cipher bytes derivation codes for sealed box encryped ciphertext. Plaintext is QB64. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ X25519_Cipher_QB64_L0: str = '4D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 0 X25519_Cipher_QB64_L1: str = '5E' # X25519 sealed box cipher bytes of QB64 plaintext lead size 1 X25519_Cipher_QB64_L2: str = '6E' # X25519 sealed box cipher bytes of QB64 plaintext lead size 2 X25519_Cipher_QB64_Big_L0: str = '7AAD' # X25519 sealed box cipher bytes of QB64 plaintext big lead size 0 X25519_Cipher_QB64_Big_L1: str = '8AAD' # X25519 sealed box cipher bytes of QB64 plaintext big lead size 1 X25519_Cipher_QB64_Big_L2: str = '9AAD' # X25519 sealed box cipher bytes of QB64 plaintext big lead size 2 def __iter__(self): return iter(astuple(self))
CiXVarQB64Dex = CipherX25519VarQB64Codex() # Make instance
[docs] @dataclass(frozen=True) class CipherX25519AllQB64Codex: """ CipherX25519AllQB64Codex is codex all both fixed and variable sized cipher bytes derivation codes for sealed box encryped ciphertext. Plaintext is B64. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ X25519_Cipher_Seed: str = 'P' # X25519 sealed box 124 char qb64 Cipher of 44 char qb64 Seed X25519_Cipher_Salt: str = '1AAH' # X25519 sealed box 100 char qb64 Cipher of 24 char qb64 Salt X25519_Cipher_QB64_L0: str = '4D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 0 X25519_Cipher_QB64_L1: str = '5E' # X25519 sealed box cipher bytes of QB64 plaintext lead size 1 X25519_Cipher_QB64_L2: str = '6E' # X25519 sealed box cipher bytes of QB64 plaintext lead size 2 X25519_Cipher_QB64_Big_L0: str = '7AAD' # X25519 sealed box cipher bytes of QB64 plaintext big lead size 0 X25519_Cipher_QB64_Big_L1: str = '8AAD' # X25519 sealed box cipher bytes of QB64 plaintext big lead size 1 X25519_Cipher_QB64_Big_L2: str = '9AAD' # X25519 sealed box cipher bytes of QB64 plaintext big lead size 2 def __iter__(self): return iter(astuple(self))
CiXAllQB64Dex = CipherX25519AllQB64Codex() # Make instance
[docs] @dataclass(frozen=True) class CipherX25519QB2VarCodex: """ CipherX25519QB2VarCodex is codex all variable sized cipher bytes derivation codes for sealed box encryped ciphertext. Plaintext is B2. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ X25519_Cipher_L0: str = '4E' # X25519 sealed box cipher bytes of QB2 plaintext lead size 0 X25519_Cipher_L1: str = '5E' # X25519 sealed box cipher bytes of QB2 plaintext lead size 1 X25519_Cipher_L2: str = '6E' # X25519 sealed box cipher bytes of QB2 plaintext lead size 2 X25519_Cipher_Big_L0: str = '7AAE' # X25519 sealed box cipher bytes of QB2 plaintext big lead size 0 X25519_Cipher_Big_L1: str = '8AAE' # X25519 sealed box cipher bytes of QB2 plaintext big lead size 1 X25519_Cipher_Big_L2: str = '9AAE' # X25519 sealed box cipher bytes of QB2 plaintext big lead size 2 def __iter__(self): return iter(astuple(self))
CiXVarQB2Dex = CipherX25519QB2VarCodex() # Make instance # namedtuple for size entries in Matter and Counter derivation code tables # hs is the hard size int number of chars in hard (stable) part of code # ss is the soft size int number of chars in soft (unstable) part of code # fs is the full size int number of chars in code plus appended material if any # ls is the lead size int number of bytes to pre-pad pre-converted raw binary Sizage = namedtuple("Sizage", "hs ss fs ls")
[docs] class Matter: """ Matter is fully qualified cryptographic material primitive base class for non-indexed primitives. Sub classes are derivation code and key event element context specific. Includes the following attributes and properties: Attributes: Properties: code (str): hard part of derivation code to indicate cypher suite both (int): hard and soft parts of full text code size (int): Number of triplets of bytes including lead bytes (quadlets of chars) of variable sized material. Value of soft size, ss, part of full text code. Otherwise None. rize (int): number of bytes of raw material not including lead bytes raw (bytes): crypto material only without code qb64 (str): Base64 fully qualified with derivation code + crypto mat qb64b (bytes): Base64 fully qualified with derivation code + crypto mat qb2 (bytes): binary with derivation code + crypto material transferable (bool): True means transferable derivation code False otherwise digestive (bool): True means digest derivation code False otherwise prefixive (bool): True means identifier prefix derivation code False otherwise Hidden: _code (str): value for .code property _raw (bytes): value for .raw property _rsize (bytes): value for .rsize property. Raw size in bytes when variable sized material else None. _size (int): value for .size property. Number of triplets of bytes including lead bytes (quadlets of chars) of variable sized material else None. _infil (types.MethodType): creates qb64b from .raw and .code (fully qualified Base64) _exfil (types.MethodType): extracts .code and .raw from qb64b (fully qualified Base64) """ Codex = MtrDex # Hards table maps from bytes Base64 first code char to int of hard size, hs, # (stable) of code. The soft size, ss, (unstable) is always 0 for Matter # unless fs is None which allows for variable size multiple of 4, i.e. # not (hs + ss) % 4. Hards = ({chr(c): 1 for c in range(65, 65 + 26)}) # size of hard part of code Hards.update({chr(c): 1 for c in range(97, 97 + 26)}) Hards.update([('0', 2), ('1', 4), ('2', 4), ('3', 4), ('4', 2), ('5', 2), ('6', 2), ('7', 4), ('8', 4), ('9', 4)]) # Sizes table maps from value of hs chars of code to Sizage namedtuple of # (hs, ss, fs, ls) where hs is hard size, ss is soft size, and fs is full size # and ls is lead size # soft size, ss, should always be 0 for Matter unless fs is None which allows # for variable size multiple of 4, i.e. not (hs + ss) % 4. Sizes = { 'A': Sizage(hs=1, ss=0, fs=44, ls=0), 'B': Sizage(hs=1, ss=0, fs=44, ls=0), 'C': Sizage(hs=1, ss=0, fs=44, ls=0), 'D': Sizage(hs=1, ss=0, fs=44, ls=0), 'E': Sizage(hs=1, ss=0, fs=44, ls=0), 'F': Sizage(hs=1, ss=0, fs=44, ls=0), 'G': Sizage(hs=1, ss=0, fs=44, ls=0), 'H': Sizage(hs=1, ss=0, fs=44, ls=0), 'I': Sizage(hs=1, ss=0, fs=44, ls=0), 'J': Sizage(hs=1, ss=0, fs=44, ls=0), 'K': Sizage(hs=1, ss=0, fs=76, ls=0), 'L': Sizage(hs=1, ss=0, fs=76, ls=0), 'M': Sizage(hs=1, ss=0, fs=4, ls=0), 'N': Sizage(hs=1, ss=0, fs=12, ls=0), 'O': Sizage(hs=1, ss=0, fs=44, ls=0), 'P': Sizage(hs=1, ss=0, fs=124, ls=0), 'Q': Sizage(hs=1, ss=0, fs=44, ls=0), 'R': Sizage(hs=1, ss=0, fs=8, ls=0), 'S': Sizage(hs=1, ss=0, fs=16, ls=0), 'T': Sizage(hs=1, ss=0, fs=20, ls=0), 'U': Sizage(hs=1, ss=0, fs=24, ls=0), 'V': Sizage(hs=1, ss=0, fs=4, ls=1), 'W': Sizage(hs=1, ss=0, fs=4, ls=0), 'X': Sizage(hs=1, ss=0, fs=4, ls=0), 'Y': Sizage(hs=1, ss=0, fs=8, ls=0), '0A': Sizage(hs=2, ss=0, fs=24, ls=0), '0B': Sizage(hs=2, ss=0, fs=88, ls=0), '0C': Sizage(hs=2, ss=0, fs=88, ls=0), '0D': Sizage(hs=2, ss=0, fs=88, ls=0), '0E': Sizage(hs=2, ss=0, fs=88, ls=0), '0F': Sizage(hs=2, ss=0, fs=88, ls=0), '0G': Sizage(hs=2, ss=0, fs=88, ls=0), '0H': Sizage(hs=2, ss=0, fs=8, ls=0), '0I': Sizage(hs=2, ss=0, fs=88, ls=0), '0J': Sizage(hs=2, ss=0, fs=4, ls=0), '0K': Sizage(hs=2, ss=0, fs=4, ls=0), '0L': Sizage(hs=2, ss=0, fs=8, ls=0), '0M': Sizage(hs=2, ss=0, fs=8, ls=0), '1AAA': Sizage(hs=4, ss=0, fs=48, ls=0), '1AAB': Sizage(hs=4, ss=0, fs=48, ls=0), '1AAC': Sizage(hs=4, ss=0, fs=80, ls=0), '1AAD': Sizage(hs=4, ss=0, fs=80, ls=0), '1AAE': Sizage(hs=4, ss=0, fs=56, ls=0), '1AAF': Sizage(hs=4, ss=0, fs=8, ls=0), '1AAG': Sizage(hs=4, ss=0, fs=36, ls=0), '1AAH': Sizage(hs=4, ss=0, fs=100, ls=0), '1AAI': Sizage(hs=4, ss=0, fs=48, ls=0), '1AAJ': Sizage(hs=4, ss=0, fs=48, ls=0), '1AAK': Sizage(hs=4, ss=0, fs=4, ls=0), '1AAL': Sizage(hs=4, ss=0, fs=4, ls=0), '1AAM': Sizage(hs=4, ss=0, fs=4, ls=0), '2AAA': Sizage(hs=4, ss=0, fs=8, ls=1), '3AAA': Sizage(hs=4, ss=0, fs=8, ls=2), '4A': Sizage(hs=2, ss=2, fs=None, ls=0), '5A': Sizage(hs=2, ss=2, fs=None, ls=1), '6A': Sizage(hs=2, ss=2, fs=None, ls=2), '7AAA': Sizage(hs=4, ss=4, fs=None, ls=0), '8AAA': Sizage(hs=4, ss=4, fs=None, ls=1), '9AAA': Sizage(hs=4, ss=4, fs=None, ls=2), '4B': Sizage(hs=2, ss=2, fs=None, ls=0), '5B': Sizage(hs=2, ss=2, fs=None, ls=1), '6B': Sizage(hs=2, ss=2, fs=None, ls=2), '7AAB': Sizage(hs=4, ss=4, fs=None, ls=0), '8AAB': Sizage(hs=4, ss=4, fs=None, ls=1), '9AAB': Sizage(hs=4, ss=4, fs=None, ls=2), '4C': Sizage(hs=2, ss=2, fs=None, ls=0), '5C': Sizage(hs=2, ss=2, fs=None, ls=1), '6C': Sizage(hs=2, ss=2, fs=None, ls=2), '7AAC': Sizage(hs=4, ss=4, fs=None, ls=0), '8AAC': Sizage(hs=4, ss=4, fs=None, ls=1), '9AAC': Sizage(hs=4, ss=4, fs=None, ls=2), '4D': Sizage(hs=2, ss=2, fs=None, ls=0), '5D': Sizage(hs=2, ss=2, fs=None, ls=1), '6D': Sizage(hs=2, ss=2, fs=None, ls=2), '7AAD': Sizage(hs=4, ss=4, fs=None, ls=0), '8AAD': Sizage(hs=4, ss=4, fs=None, ls=1), '9AAD': Sizage(hs=4, ss=4, fs=None, ls=2), '4E': Sizage(hs=2, ss=2, fs=None, ls=0), '5E': Sizage(hs=2, ss=2, fs=None, ls=1), '6E': Sizage(hs=2, ss=2, fs=None, ls=2), '7AAE': Sizage(hs=4, ss=4, fs=None, ls=0), '8AAE': Sizage(hs=4, ss=4, fs=None, ls=1), '9AAE': Sizage(hs=4, ss=4, fs=None, ls=2), } # Bards table maps first code char. converted to binary sextext of hard size, # hs. Used for ._bexfil. Bards = ({codeB64ToB2(c): hs for c, hs in Hards.items()})
[docs] def __init__(self, raw=None, code=MtrDex.Ed25519N, rize=None, qb64b=None, qb64=None, qb2=None, strip=False): """ Validate as fully qualified Parameters: raw (bytes): unqualified crypto material usable for crypto operations code (str): stable (hard) part of derivation code rize (int): raw size in bytes when variable sized material else None qb64b (bytes): fully qualified crypto material Base64 qb64 (str, bytes): fully qualified crypto material Base64 qb2 (bytes): fully qualified crypto material Base2 strip (bool): True means strip (delete) matter from input stream bytearray after parsing qb64b or qb2. False means do not strip Needs either (raw and code and optionally size and rsize) or qb64b or qb64 or qb2 Otherwise raises EmptyMaterialError When raw and code and optional size and rsize provided then validate that code is correct for length of raw, size, rsize and assign .raw Else when qb64b or qb64 or qb2 provided extract and assign .raw and .code and .size and .rsize """ size = None # variable raw binary size including leader in quadlets if raw is not None: # raw provided if not code: raise EmptyMaterialError(f"Improper initialization need either " f"(raw and code) or qb64b or qb64 or qb2.") if not isinstance(raw, (bytes, bytearray)): raise TypeError(f"Not a bytes or bytearray, raw={raw}.") if code not in self.Sizes: raise InvalidCodeError("Unsupported code={}.".format(code)) if code[0] in SmallVrzDex or code[0] in LargeVrzDex: # dynamic size if rize: # use rsize to determin length of raw to extract if rize < 0: raise InvalidVarRawSizeError(f"Missing var raw size for " f"code={code}.") else: # use length of provided raw as rize rize = len(raw) ls = (3 - (rize % 3)) % 3 # calc actual lead (pad) size # raw binary size including leader in bytes size = (rize + ls) // 3 # calculate value of size in triplets if code[0] in SmallVrzDex: # compute code with sizes if size <= (64 ** 2 - 1): hs = 2 s = astuple(SmallVrzDex)[ls] code = f"{s}{code[1:hs]}" elif size <= (64 ** 4 - 1): # make big version of code hs = 4 s = astuple(LargeVrzDex)[ls] code = f"{s}{'A' * (hs - 2)}{code[1]}" else: raise InvalidVarRawSizeError(r"Unsupported raw size for " f"code={code}.") elif code[0] in LargeVrzDex: # compute code with sizes if size <= (64 ** 4 - 1): hs = 4 s = astuple(LargeVrzDex)[ls] code = f"{s}{code[1:hs]}" else: raise InvalidVarRawSizeError(r"Unsupported raw size for " f"code={code}.") else: raise InvalidVarRawSizeError(r"Unsupported variable raw size " f"code={code}.") else: hs, ss, fs, ls = self.Sizes[code] # get sizes assumes ls consistent if not fs: # invalid raise InvalidVarSizeError(r"Unsupported variable size " f"code={code}.") rize = Matter._rawSize(code) raw = raw[:rize] # copy only exact size from raw stream if len(raw) != rize: # forbids shorter raise RawMaterialError(f"Not enougth raw bytes for code={code}" f"expected {rize} got {len(raw)}.") self._code = code # hard value part of code self._size = size # soft value part of code in int self._raw = bytes(raw) # crypto ops require bytes not bytearray elif qb64b is not None: self._exfil(qb64b) if strip: # assumes bytearray del qb64b[:self.fullSize] elif qb64 is not None: self._exfil(qb64) elif qb2 is not None: self._bexfil(qb2) if strip: # assumes bytearray del qb2[:self.fullSize * 3 // 4] else: raise EmptyMaterialError(f"Improper initialization need either " f"(raw and code) or qb64b or qb64 or qb2.")
@classmethod def _rawSize(cls, code): """ Returns raw size in bytes not including leader for a given code Parameters: code (str): derivation code Base64 """ hs, ss, fs, ls = cls.Sizes[code] # get sizes cs = hs + ss # both hard + soft code size if fs is None: raise InvalidCodeSizeError(f"Non-fixed raw size code {code}.") return (((fs - cs) * 3 // 4) - ls) @classmethod def _leadSize(cls, code): """ Returns lead size in bytes for a given code Parameters: code (str): derivation code Base64 """ _, _, _, ls = cls.Sizes[code] # get lead size from .Sizes table return ls @property def code(self): """ Returns ._code which is the hard part only of full text code. Some codes only have a hard part. Soft part is for variable sized matter. Makes .code read only """ return self._code @property def both(self): """ Returns both hard and soft parts of full text code """ _, ss, _, _ = self.Sizes[self.code] return (f"{self.code}{intToB64(self.size, l=ss)}") @property def size(self): """ Returns ._size int or None if not variable sized matter Makes .size read only Number of triplets of bytes including lead bytes (quadlets of chars) of variable sized material. Value of soft size, ss, part of full text code. """ return self._size @property def fullSize(self): """ Returns full size of matter in bytes Fixed size codes returns fs from .Sizes Variable size codes where fs==None computes fs from .size and sizes """ hs, ss, fs, _ = self.Sizes[self.code] # get sizes if fs is None: # compute fs from ss characters in code fs = hs + ss + (self.size * 4) return fs @property def raw(self): """ Returns ._raw Makes .raw read only """ return self._raw @property def qb64b(self): """ Property qb64b: Returns Fully Qualified Base64 Version encoded as bytes Assumes self.raw and self.code are correctly populated """ return self._infil() @property def qb64(self): """ Property qb64: Returns Fully Qualified Base64 Version Assumes self.raw and self.code are correctly populated """ return self.qb64b.decode("utf-8") @property def qb2(self): """ Property qb2: Returns Fully Qualified Binary Version Bytes """ return self._binfil() @property def transferable(self): """ Property transferable: Returns True if identifier does not have non-transferable derivation code, False otherwise """ return (self.code not in NonTransDex) @property def digestive(self): """ Property digestable: Returns True if identifier has digest derivation code, False otherwise """ return (self.code in DigDex) @property def prefixive(self): """ Property prefixive: Returns True if identifier has prefix derivation code, False otherwise """ return (self.code in PreDex) def _infil(self): """ Returns bytes of fully qualified base64 characters self.code + converted self.raw to Base64 with pad chars stripped cs = hs + ss fs = (size * 4) + cs """ code = self.code # hard size codex value size = self.size # size if variable length, None otherwise raw = self.raw # bytes or bytearray ps = ((3 - (len(raw) % 3)) % 3) # pad size chars or lead size bytes hs, ss, fs, ls = self.Sizes[code] if not fs: # variable sized, compute code ss value from .size cs = hs + ss # both hard + soft size if cs % 4: raise InvalidCodeSizeError(f"Whole code size not multiple of 4 for " f"variable length material. cs={cs}.") if size < 0 or size > (64 ** ss - 1): raise InvalidVarSizeError("Invalid size={} for code={}." "".format(size, code)) # both is hard code + size converted to ss B64 chars both = f"{code}{intToB64(size, l=ss)}" if len(both) % 4 != ps - ls: # adjusted pad given lead bytes raise InvalidCodeSizeError(f"Invalid code={both} for converted" f" raw pad size={ps}.") # prepad, convert, and prepend return (both.encode("utf-8") + encodeB64(bytes([0] * ls) + raw)) else: # fixed size so prepad but lead ls may not be zero both = code cs = len(both) if (cs % 4) != ps - ls: # adjusted pad given lead bytes raise InvalidCodeSizeError(f"Invalid code={both} for converted" f" raw pad size={ps}.") # prepad, convert, and replace upfront # when fixed and ls != 0 then cs % 4 is zero and ps==ls # otherwise fixed and ls == 0 then cs % 4 == ps return (both.encode("utf-8") + encodeB64(bytes([0] * ps) + raw)[cs % 4:]) def _binfil(self): """ Returns bytes of fully qualified base2 bytes, that is .qb2 self.code converted to Base2 + self.raw left shifted with pad bits equivalent of Base64 decode of .qb64 into .qb2 """ code = self.code # codex value size = self.size # optional size if variable length raw = self.raw # bytes or bytearray hs, ss, fs, ls = self.Sizes[code] cs = hs + ss if not fs: # compute both and fs from size if cs % 4: raise InvalidCodeSizeError("Whole code size not multiple of 4 for " "variable length material. cs={}.".format(cs)) if size < 0 or size > (64 ** ss - 1): raise InvalidVarSizeError("Invalid size={} for code={}." "".format(size, code)) # both is hard code + converted index both = f"{code}{intToB64(size, l=ss)}" fs = hs + ss + (size * 4) else: both = code if len(both) != cs: raise InvalidCodeSizeError("Mismatch code size = {} with table = {}." .format(cs, len(code))) n = sceil(cs * 3 / 4) # number of b2 bytes to hold b64 code # convert code both to right align b2 int then left shift in pad bits # then convert to bytes bcode = (b64ToInt(both) << (2 * (cs % 4))).to_bytes(n, 'big') full = bcode + bytes([0] * ls) + raw bfs = len(full) if bfs % 3 or (bfs * 4 // 3) != fs: # invalid size raise InvalidCodeSizeError(f"Invalid code={both} for raw size={len(raw)}.") return full def _exfil(self, qb64b): """ Extracts self.code and self.raw from qualified base64 bytes qb64b cs = hs + ss fs = (size * 4) + cs """ if not qb64b: # empty need more bytes raise ShortageError("Empty material.") first = qb64b[:1] # extract first char code selector if hasattr(first, "decode"): first = first.decode("utf-8") if first not in self.Hards: if first[0] == '-': raise UnexpectedCountCodeError("Unexpected count code start" "while extracing Matter.") elif first[0] == '_': raise UnexpectedOpCodeError("Unexpected op code start" "while extracing Matter.") else: raise UnexpectedCodeError(f"Unsupported code start char={first}.") hs = self.Hards[first] # get hard code size if len(qb64b) < hs: # need more bytes raise ShortageError(f"Need {hs - len(qb64b)} more characters.") hard = qb64b[:hs] # extract hard code if hasattr(hard, "decode"): hard = hard.decode("utf-8") # converts bytes/bytearray to str if hard not in self.Sizes: raise UnexpectedCodeError(f"Unsupported code ={hard}.") hs, ss, fs, ls = self.Sizes[hard] # assumes hs in both tables match cs = hs + ss # both hs and ss size = None if not fs: # compute fs from size chars in ss part of code if cs % 4: raise ValidationError(f"Whole code size not multiple of 4 for " f"variable length material. cs={cs}.") size = qb64b[hs:hs + ss] # extract size chars if hasattr(size, "decode"): size = size.decode("utf-8") size = b64ToInt(size) # compute int size fs = (size * 4) + cs # assumes that unit tests on Matter and MatterCodex ensure that # .Codes and .Sizes are well formed. # hs consistent and ss == 0 and not fs % 4 and hs > 0 and fs >= hs + ss # unless fs is None if len(qb64b) < fs: # need more bytes raise ShortageError(f"Need {fs - len(qb64b)} more chars.") qb64b = qb64b[:fs] # fully qualified primitive code plus material if hasattr(qb64b, "encode"): # only convert extracted chars from stream qb64b = qb64b.encode("utf-8") # check for non-zeroed pad bits or lead bytes ps = cs % 4 # code pad size ps = cs mod 4 pbs = 2 * (ps if ps else ls) # pad bit size in bits if ps: # ps. IF ps THEN not ls (lead) and vice versa OR not ps and not ls base = ps * b'A' + qb64b[cs:] # replace pre code with prepad chars of zero paw = decodeB64(base) # decode base to leave prepadded raw pi = (int.from_bytes(paw[:ps], "big")) # prepad as int if pi & (2 ** pbs - 1 ): # masked pad bits non-zero raise ValueError(f"Non zeroed prepad bits = " f"{pi & (2 ** pbs - 1 ):<06b} in {qb64b[cs:cs+1]}.") raw = paw[ps:] # strip off ps prepad paw bytes else: # not ps. IF not ps THEN may or may not be ls (lead) base = qb64b[cs:] # strip off code leaving lead chars if any and value # decode lead chars + val leaving lead bytes + raw bytes # then strip off ls lead bytes leaving raw paw = decodeB64(base) # decode base to leave prepadded paw bytes li = int.from_bytes(paw[:ls], "big") # lead as int if li: # pre pad lead bytes must be zero if ls == 1: raise ValueError(f"Non zeroed lead byte = 0x{li:02x}.") else: raise ValueError(f"Non zeroed lead bytes = 0x{li:04x}.") raw = paw[ls:] # paw is bytes so raw is bytes if len(raw) != ((len(qb64b) - cs) * 3 // 4) - ls: # exact lengths raise ConversionError(f"Improperly qualified material = {qb64b}") self._code = hard # hard only self._size = size self._raw = raw # ensure bytes so immutable and for crypto ops def _bexfil(self, qb2): """ Extracts self.code and self.raw from qualified base2 qb2 Parameters: qb2 (bytes | bytearray): fully qualified base2 from stream """ if not qb2: # empty need more bytes raise ShortageError("Empty material, Need more bytes.") first = nabSextets(qb2, 1) # extract first sextet as code selector if first not in self.Bards: if first[0] == b'\xf8': # b64ToB2('-') raise UnexpectedCountCodeError("Unexpected count code start" "while extracing Matter.") elif first[0] == b'\xfc': # b64ToB2('_') raise UnexpectedOpCodeError("Unexpected op code start" "while extracing Matter.") else: raise UnexpectedCodeError(f"Unsupported code start sextet={first}.") hs = self.Bards[first] # get code hard size equvalent sextets bhs = sceil(hs * 3 / 4) # bhs is min bytes to hold hs sextets if len(qb2) < bhs: # need more bytes raise ShortageError(f"Need {bhs - len(qb2)} more bytes.") hard = codeB2ToB64(qb2, hs) # extract and convert hard part of code if hard not in self.Sizes: raise UnexpectedCodeError(f"Unsupported code ={hard}.") hs, ss, fs, ls = self.Sizes[hard] cs = hs + ss # both hs and ss bcs = sceil(cs * 3 / 4) # bcs is min bytes to hold cs sextets size = None if not fs: # compute fs from size chars in ss part of code if cs % 4: raise ValidationError("Whole code size not multiple of 4 for " "variable length material. cs={}.".format(cs)) if len(qb2) < bcs: # need more bytes raise ShortageError("Need {} more bytes.".format(bcs - len(qb2))) both = codeB2ToB64(qb2, cs) # extract and convert both hard and soft part of code size = b64ToInt(both[hs:hs + ss]) # get size fs = (size * 4) + cs # assumes that unit tests on Matter and MatterCodex ensure that # .Codes and .Sizes are well formed. # hs consistent and ss == 0 and not fs % 4 and hs > 0 and # (fs >= hs + ss if fs is not None else True) bfs = sceil(fs * 3 / 4) # bfs is min bytes to hold fs sextets if len(qb2) < bfs: # need more bytes raise ShortageError("Need {} more bytes.".format(bfs - len(qb2))) qb2 = qb2[:bfs] # extract qb2 fully qualified primitive code plus material # check for non-zeroed prepad bits or lead bytes ps = cs % 4 # code pad size ps = cs mod 4 pbs = 2 * (ps if ps else ls) # pad bit size in bits if ps: # ps. IF ps THEN not ls (lead) and vice versa OR not ps and not ls # convert last byte of code bytes in which are pad bits to int pi = (int.from_bytes(qb2[bcs-1:bcs], "big")) if pi & (2 ** pbs - 1 ): # masked pad bits non-zero raise ValueError(f"Non zeroed pad bits = " f"{pi & (2 ** pbs - 1 ):>08b} in 0x{pi:02x}.") else: # not ps. IF not ps THEN may or may not be ls (lead) li = int.from_bytes(qb2[bcs:bcs+ls], "big") # lead as int if li: # pre pad lead bytes must be zero if ls == 1: raise ValueError(f"Non zeroed lead byte = 0x{li:02x}.") else: raise ValueError(f"Non zeroed lead bytes = 0x{li:02x}.") raw = qb2[(bcs + ls):] # strip code and leader bytes from qb2 to get raw if len(raw) != (len(qb2) - bcs - ls): # exact lengths raise ConversionError(r"Improperly qualified material = {qb2}") self._code = hard self._size = size self._raw = bytes(raw) # ensure bytes so immutable and crypto operations
[docs] class Seqner(Matter): """ Seqner is subclass of Matter, cryptographic material, for ordinal numbers such as sequence numbers or first seen ordering numbers. Seqner provides fully qualified format for ordinals (sequence numbers etc) when provided as attached cryptographic material elements. Useful when parsing attached receipt groupings with sn from stream or database Uses default initialization code = CryTwoDex.Salt_128 Raises error on init if code not CryTwoDex.Salt_128 Attributes: Inherited Properties: (See Matter) .pad is int number of pad chars given raw .code is str derivation code to indicate cypher suite .raw is bytes crypto material only without code .index is int count of attached crypto material by context (receipts) .qb64 is str in Base64 fully qualified with derivation code + crypto mat .qb64b is bytes in Base64 fully qualified with derivation code + crypto mat .qb2 is bytes in binary with derivation code + crypto material .transferable is Boolean, True when transferable derivation code False otherwise Properties: .sn is int sequence number .snh is hex string representation of sequence number no leading zeros Hidden: ._pad is method to compute .pad property ._code is str value for .code property ._raw is bytes value for .raw property ._index is int value for .index property ._infil is method to compute fully qualified Base64 from .raw and .code ._exfil is method to extract .code and .raw from fully qualified Base64 Methods: """
[docs] def __init__(self, raw=None, qb64b=None, qb64=None, qb2=None, code=MtrDex.Salt_128, sn=None, snh=None, **kwa): """ Inherited Parameters: (see Matter) raw is bytes of unqualified crypto material usable for crypto operations qb64b is bytes of fully qualified crypto material qb64 is str or bytes of fully qualified crypto material qb2 is bytes of fully qualified crypto material code is str of derivation code index is int of count of attached receipts for CryCntDex codes Parameters: sn is int sequence number or some form of ordinal number snh is hex string of sequence number """ if raw is None and qb64b is None and qb64 is None and qb2 is None: if sn is None: if snh is None: sn = 0 else: sn = int(snh, 16) raw = sn.to_bytes(Matter._rawSize(MtrDex.Salt_128), 'big') super(Seqner, self).__init__(raw=raw, qb64b=qb64b, qb64=qb64, qb2=qb2, code=code, **kwa) if self.code != MtrDex.Salt_128: raise ValidationError("Invalid code = {} for Seqner." "".format(self.code))
@property def sn(self): """ Property sn: sequence number as int Returns .raw converted to int """ return int.from_bytes(self.raw, 'big') @property def snh(self): """ Property snh: sequence number as hex Returns .sn int converted to hex str """ return f"{self.sn:x}" # "{:x}".format(self.sn)
[docs] class Number(Matter): """ Number is subclass of Matter, cryptographic material, for ordinal counting whole numbers (non-negative integers) up to a maximum size of 16 bytes, 256 ** 16 - 1. Examples uses are sequence numbers or first seen ordering numbers or thresholds. Seqner provides fully qualified format for ordinals (sequence numbers etc) when provided as attached cryptographic material elements. Useful when parsing attached receipt groupings with sn from stream or database Uses default initialization code = CryTwoDex.Salt_128 Raises error on init if code not CryTwoDex.Salt_128 Attributes: Inherited Properties: (See Matter) code (str): hard part of derivation code to indicate cypher suite both (int): hard and soft parts of full text code size (int): Number of triplets of bytes including lead bytes (quadlets of chars) of variable sized material. Value of soft size, ss, part of full text code. Otherwise None. rize (int): number of bytes of raw material not including lead bytes raw (bytes): crypto material only without code qb64 (str): Base64 fully qualified with derivation code + crypto mat qb64b (bytes): Base64 fully qualified with derivation code + crypto mat qb2 (bytes): binary with derivation code + crypto material transferable (bool): True means transferable derivation code False otherwise digestive (bool): True means digest derivation code False otherwise Properties: num (int): int representation of number humh (str): hex string representation of number with no leading zeros positive (bool): True if .num > 0, False otherwise. Because .num must be non-negative, .positive == False means .num == 0 Hidden: _code (str): value for .code property _raw (bytes): value for .raw property _rsize (bytes): value for .rsize property. Raw size in bytes when variable sized material else None. _size (int): value for .size property. Number of triplets of bytes including lead bytes (quadlets of chars) of variable sized material else None. _infil (types.MethodType): creates qb64b from .raw and .code (fully qualified Base64) _exfil (types.MethodType): extracts .code and .raw from qb64b (fully qualified Base64) Methods: """
[docs] def __init__(self, raw=None, qb64b=None, qb64=None, qb2=None, code=NumDex.Short, num=None, numh=None, **kwa): """ Inherited Parameters: (see Matter) raw (bytes): unqualified crypto material usable for crypto operations code (str): stable (hard) part of derivation code rize (int): raw size in bytes when variable sized material else None qb64b (bytes): fully qualified crypto material Base64 qb64 (str, bytes): fully qualified crypto material Base64 qb2 (bytes): fully qualified crypto material Base2 strip (bool): True means strip (delete) matter from input stream bytearray after parsing qb64b or qb2. False means do not strip Parameters: num (int | str | None): non-negative int number or hex str of int number or 0 if None numh (str): string equivalent of non-negative int number Note: int("0xab", 16) is also valid since int recognizes 0x hex prefix """ if raw is None and qb64b is None and qb64 is None and qb2 is None: try: if num is None: if numh is None or numh == '': num = 0 else: #if len(numh) > 32: #raise InvalidValueError(f"Hex numh={numh} str too long.") num = int(numh, 16) else: # handle case where num is hex str' if isinstance(num, str): if num == '': num = 0 else: #if len(num) > 32: #raise InvalidValueError(f"Hex num={num} str too long.") num = int(num, 16) except ValueError as ex: raise InvalidValueError(f"Invalid whole number={num} .") from ex if not isinstance(num, int) or num < 0: raise InvalidValueError(f"Invalid whole number={num}.") if num <= (256 ** 2 - 1): # make short version of code code = NumDex.Short elif num <= (256 ** 4 - 1): # make long version of code code = code = NumDex.Long elif num <= (256 ** 8 - 1): # make big version of code code = code = NumDex.Big elif num <= (256 ** 16 - 1): # make huge version of code code = code = NumDex.Huge else: raise InvalidValueError(f"Invalid num = {num}, too large to encode.") # default to_bytes parameter signed is False. If negative raises # OverflowError: can't convert negative int to unsigned raw = num.to_bytes(Matter._rawSize(code), 'big') # big endian unsigned super(Number, self).__init__(raw=raw, qb64b=qb64b, qb64=qb64, qb2=qb2, code=code, **kwa) if self.code not in NumDex: raise ValidationError(f"Invalid code = {self.code} for Number.")
@property def num(self): """ Property num: number as int Returns .raw converted to int """ return int.from_bytes(self.raw, 'big') @property def numh(self): """ Property numh: number as hex string no leading zeros Returns .num int converted to hex str """ return f"{self.num:x}" @property def sn(self): """Sequence number, sn property getter to mimic Seqner interface Returns: sn (int): alias for num """ return self.num @property def snh(self): """Sequence number hex str, snh property getter to mimic Seqner interface Returns: snh (hex str): alias for numh """ return self.numh @property def positive(self): """ Returns True if .num is strictly positive non-zero False otherwise. Because valid number .num must be non-negative, positive False also means that .num is zero. """ return True if self.num > 0 else False @property def inceptive(self): """ Returns True if .num == 0 False otherwise. Because valid number .num must be non-negative, positive False means that .num is zero. """ return True if self.num == 0 else False
[docs] class Dater(Matter): """ Dater is subclass of Matter, cryptographic material, for RFC-3339 profile of ISO-8601 formatted datetimes. Dater provides a custom Base64 coding of an ASCII RFC-3339 profile of an ISO-8601 datetime by replacing (using translate) the three non-Base64 characters, ':.+' with the Base64 equivalents, 'cdp' respectively. Dater provides a more compact representation than would be obtained by converting the raw ASCII RFC-3339 profile ISO-8601 datetime to Base64. Dater supports datetimes as attached crypto material in replay of events for the datetime of when the event was first seen. The datetime textual representation is restricted to a specific 32 byte variant (profile) of ISO-8601 datetime with microseconds and UTC offset in HH:MM (See RFC-3339). Uses default initialization derivation code = MtrDex.DateTime. Raises error on init if code not MtrDex.DateTime Examples: given RFC-3339 profiles of ISO-8601 datetime strings: '2020-08-22T17:50:09.988921+00:00' '2020-08-22T17:50:09.988921-01:00' The fully encoded qualified Base64, .qb64 versions are respectively '1AAG2020-08-22T17c50c09d988921p00c00' '1AAG2020-08-22T17c50c09d988921-01c00' The qualified binary version, .qb2 is the Base64 decoding the qualified Base64, qb64, '1AAG2020-08-22T17c50c09d988921p00c00' The raw binary of the fully encoded version is the Base64 decoding of the the datetime only portion, '2020-08-22T17c50c09d988921p00c00' Use the properties to get the different representations .dts is ASCII RFC-3339 of ISO-8601 .qb64 is qualified Base64 encoding with derivation code proem and ':.+' replaced with 'cdp' .qb2 is qualified binary decoding of the .qb64 .code is text CESR derivation code .raw is binary version of the converted datetime only portion of .qb64 Example uses: attached first seen couples with fn+dt Attributes: Inherited Properties: (See Matter) .pad is int number of pad chars given raw .code is str derivation code to indicate cypher suite .raw is bytes crypto material only without code .index is int count of attached crypto material by context (receipts) .qb64 is str in Base64 fully qualified with derivation code + crypto mat .qb64b is bytes in Base64 fully qualified with derivation code + crypto mat .qb2 is bytes in binary with derivation code + crypto material .transferable is Boolean, True when transferable derivation code False otherwise Properties: .dts is the ISO-8601 datetime string Hidden: ._pad is method to compute .pad property ._code is str value for .code property ._raw is bytes value for .raw property ._index is int value for .index property ._infil is method to compute fully qualified Base64 from .raw and .code ._exfil is method to extract .code and .raw from fully qualified Base64 Methods: """ ToB64 = str.maketrans(":.+", "cdp") # translate characters FromB64 = str.maketrans("cdp", ":.+") # translate characters
[docs] def __init__(self, raw=None, qb64b=None, qb64=None, qb2=None, code=MtrDex.Salt_128, dts=None, **kwa): """ Inherited Parameters: (see Matter) raw is bytes of unqualified crypto material usable for crypto operations qb64b is bytes of fully qualified crypto material qb64 is str or bytes of fully qualified crypto material qb2 is bytes of fully qualified crypto material code is str of derivation code index is int of count of attached receipts for CryCntDex codes Parameters: dts is the ISO-8601 datetime as str or bytes """ if raw is None and qb64b is None and qb64 is None and qb2 is None: if dts is None: # defaults to now dts = helping.nowIso8601() # if len(dts) != 32: # raise ValueError("Invalid length of date time string") if hasattr(dts, "decode"): dts = dts.decode("utf-8") qb64 = MtrDex.DateTime + dts.translate(self.ToB64) super(Dater, self).__init__(raw=raw, qb64b=qb64b, qb64=qb64, qb2=qb2, code=code, **kwa) if self.code != MtrDex.DateTime: raise ValidationError("Invalid code = {} for Dater date time." "".format(self.code))
@property def dts(self): """ Property dts: date-time-stamp str Returns .qb64 translated to RFC-3339 profile of ISO 8601 datetime str """ return self.qb64[self.Sizes[self.code].hs:].translate(self.FromB64) @property def dtsb(self): """ Property dtsb: date-time-stamp bytes Returns .qb64 translated to RFC-3339 profile of ISO 8601 datetime bytes """ return self.qb64[self.Sizes[self.code].hs:].translate(self.FromB64).encode("utf-8") @property def datetime(self): """ Property datetime: Returns datetime.datetime instance converted from .dts """ return helping.fromIso8601(self.dts)
[docs] class Bexter(Matter): """ Bexter is subclass of Matter, cryptographic material, for variable length strings that only contain Base64 URL safe characters, i.e. Base64 text (bext). When created using the 'bext' paramaeter, the encoded matter in qb64 format in the text domain is more compact than would be the case if the string were passed in as raw bytes. The text is used as is to form the value part of the qb64 version not including the leader. Due to ambiguity that arises from pre-padding bext whose length is a multiple of three with one or more 'A' chars. Any bext that starts with an 'A' and whose length is either a multiple of 3 or 4 may not round trip. Bext with a leading 'A' whose length is a multiple of four may have the leading 'A' stripped when round tripping. Bexter(bext='ABBB').bext == 'BBB' Bexter(bext='BBB').bext == 'BBB' Bexter(bext='ABBB').qb64 == '4AABABBB' == Bexter(bext='BBB').qb64 To avoid this problem, only use for applications of base 64 strings that never start with 'A' Examples: base64 text strings: bext = "" qb64 = '4AAA' bext = "-" qb64 = '6AABAAA-' bext = "-A" qb64 = '5AABAA-A' bext = "-A-" qb64 = '4AABA-A-' bext = "-A-B" qb64 = '4AAB-A-B' Example uses: CESR encoded paths for nested SADs and SAIDs CESR encoded fractionally weighted threshold expressions Attributes: Inherited Properties: (See Matter) .pad is int number of pad chars given raw .code is str derivation code to indicate cypher suite .raw is bytes crypto material only without code .index is int count of attached crypto material by context (receipts) .qb64 is str in Base64 fully qualified with derivation code + crypto mat .qb64b is bytes in Base64 fully qualified with derivation code + crypto mat .qb2 is bytes in binary with derivation code + crypto material .transferable is Boolean, True when transferable derivation code False otherwise Properties: .text is the Base64 text value, .qb64 with text code and leader removed. Hidden: ._pad is method to compute .pad property ._code is str value for .code property ._raw is bytes value for .raw property ._index is int value for .index property ._infil is method to compute fully qualified Base64 from .raw and .code ._exfil is method to extract .code and .raw from fully qualified Base64 Methods: """
[docs] def __init__(self, raw=None, qb64b=None, qb64=None, qb2=None, code=MtrDex.StrB64_L0, bext=None, **kwa): """ Inherited Parameters: (see Matter) raw is bytes of unqualified crypto material usable for crypto operations qb64b is bytes of fully qualified crypto material qb64 is str or bytes of fully qualified crypto material qb2 is bytes of fully qualified crypto material code is str of derivation code index is int of count of attached receipts for CryCntDex codes Parameters: bext is the variable sized Base64 text string """ if raw is None and qb64b is None and qb64 is None and qb2 is None: if bext is None: raise EmptyMaterialError("Missing bext string.") if hasattr(bext, "encode"): bext = bext.encode("utf-8") if not Reb64.match(bext): raise ValueError("Invalid Base64.") raw = self._rawify(bext) super(Bexter, self).__init__(raw=raw, qb64b=qb64b, qb64=qb64, qb2=qb2, code=code, **kwa) if self.code not in BexDex: raise ValidationError("Invalid code = {} for Bexter." "".format(self.code))
def _rawify(self, bext): """Returns raw value equivalent of Base64 text. Suitable for variable sized matter Parameters: text (bytes): Base64 bytes """ ts = len(bext) % 4 # bext size mod 4 ws = (4 - ts) % 4 # pre conv wad size in chars ls = (3 - ts) % 3 # post conv lead size in bytes base = b'A' * ws + bext # pre pad with wad of zeros in Base64 == 'A' raw = decodeB64(base)[ls:] # convert and remove leader return raw # raw binary equivalent of text @property def bext(self): """ Property bext: Base64 text value portion of qualified b64 str Returns the value portion of .qb64 with text code and leader removed """ _, _, _, ls = self.Sizes[self.code] bext = encodeB64(bytes([0] * ls) + self.raw) ws = 0 if ls == 0 and bext: if bext[0] == ord(b'A'): # strip leading 'A' zero pad ws = 1 else: ws = (ls + 1) % 4 return bext.decode('utf-8')[ws:]
[docs] class Pather(Bexter): """ Pather is a subclass of Bexter that provides SAD Path language specific functionality for variable length strings that only contain Base64 URL safe characters. Pather allows the specification of SAD Paths as a list of field components which will be converted to the Base64 URL safe character representation. Additionally, Pather provides .rawify for extracting and serializing the content targeted by .path for a SAD, represented as an instance of Serder. Pather enforces Base64 URL character safety by leveraging the fact that SADs must have static field ordering. Any field label can be replaced by its field ordinal to allow for path specification and traversal for any field labels that contain non-Base64 URL safe characters. Examples: strings: path = [] text = "-" qb64 = '6AABAAA-' path = ["A"] text = "-A" qb64 = '5AABAA-A' path = ["A", "B"] text = "-A-B" qb64 = '4AAB-A-B' path = ["A", 1, "B", 3] text = "-A-1-B-3" qb64 = '4AAC-A-1-B-3' """
[docs] def __init__(self, raw=None, qb64b=None, qb64=None, qb2=None, bext=None, code=MtrDex.StrB64_L0, path=None, **kwa): """ Inherited Parameters: (see Bexter) raw is bytes of unqualified crypto material usable for crypto operations qb64b is bytes of fully qualified crypto material qb64 is str or bytes of fully qualified crypto material qb2 is bytes of fully qualified crypto material code is str of derivation code index is int of count of attached receipts for CryCntDex codes bext is the variable sized Base64 text string Parameters: path (list): array of path field components """ if raw is None and bext is None and qb64b is None and qb64 is None and qb2 is None: if path is None: raise EmptyMaterialError("Missing path list.") bext = self._bextify(path) super(Pather, self).__init__(raw=raw, qb64b=qb64b, qb64=qb64, qb2=qb2, bext=bext, code=code, **kwa)
@property def path(self): """ Path property is an array of path elements Path property is an array of path elements. Empty path represents the top level. Returns: list: array of field specs of the path """ if not self.bext.startswith("-"): raise Exception("invalid SAD ptr") path = self.bext.strip("-").split("-") return path if path[0] != '' else []
[docs] def root(self, root): """ Returns a new Pather anchored at new root Returns a new Pather anchoring this path at the new root specified by root. Args: root(Pather): the new root to apply to this path Returns: Pather: new path anchored at root """ return Pather(path=root.path + self.path)
[docs] def strip(self, root): """ Returns a new Pather with root stipped off the front if it exists Returns a new Pather with root stripped off the front Args: root(Pather): the new root to apply to this path Returns: Pather: new path anchored at root """ if len(root.path) > len(self.path): return Pather(path=self.path) path = list(self.path) try: for i in root.path: path.remove(i) except ValueError: return Pather(path=self.path) return Pather(path=path)
[docs] def startswith(self, path): """ Returns True if path is the root of self Parameters: path (Pather): the path to check against self Returns: bool: True if path is the root of self """ return self.bext.startswith(path.bext)
[docs] def resolve(self, sad): """ Recurses thru value following ptr Parameters: sad(dict or list): the next component Returns: Value at the end of the path """ return self._resolve(sad, self.path)
[docs] def tail(self, serder): """ Recurses thru value following .path and returns terminal value Finds the value at this path and applies the version string rules of the serder to serialize the value at ptr. Parameters: serder(Serder): the versioned dict to in which to resolve .path Returns: bytes: Value at the end of the path """ val = self.resolve(sad=serder.sad) if isinstance(val, str): saider = Saider(qb64=val) return saider.qb64b elif isinstance(val, dict): return dumps(val, serder.kind) elif isinstance(val, list): return dumps(val, serder.kind) else: raise ValueError("Bad tail value at {} of {}" .format(self.path, serder.ked))
@staticmethod def _bextify(path): """ Returns Base64 text delimited equivalent of path components Suitable for variable sized matter Parameters: path (list): array of path field components Returns: str: textual representation of SAD path """ vath = [] # valid path components for p in path: if hasattr(p, "decode"): p = p.decode("utf-8") elif isinstance(p, int): p = str(p) if not Reb64.match(p.encode("utf-8")): raise ValueError(f"Non Base64 path component = {p}.") vath.append(p) return ("-" + "-".join(vath)) def _resolve(self, val, ptr): """ Recurses thru value following ptr Parameters: val(Optional(dict,list)): the next component ptr(list): list of path components Returns: Value at the end of the chain """ if len(ptr) == 0: return val idx = ptr.pop(0) if isinstance(val, dict): if idx.isdigit(): i = int(idx) keys = list(val) if i >= len(keys): raise KeyError(f"invalid dict pointer index {i} for keys {keys}") cur = val[list(val)[i]] elif idx == "": return val else: cur = val[idx] elif isinstance(val, list): i = int(idx) if i >= len(val): raise KeyError(f"invalid array pointer index {i} for array {val}") cur = val[i] else: raise KeyError("invalid traversal type") return self._resolve(cur, ptr)
[docs] class Verfer(Matter): """ Verfer is Matter subclass with method to verify signature of serialization using the .raw as verifier key and .code for signature cipher suite. See Matter for inherited attributes and properties: Attributes: Properties: Methods: verify: verifies signature """
[docs] def __init__(self, **kwa): """ Assign verification cipher suite function to ._verify """ super(Verfer, self).__init__(**kwa) if self.code in [MtrDex.Ed25519N, MtrDex.Ed25519]: self._verify = self._ed25519 elif self.code in [MtrDex.ECDSA_256r1N, MtrDex.ECDSA_256r1]: self._verify = self._secp256r1 elif self.code in [MtrDex.ECDSA_256k1N, MtrDex.ECDSA_256k1]: self._verify = self._secp256k1 else: raise ValueError("Unsupported code = {} for verifier.".format(self.code))
[docs] def verify(self, sig, ser): """ Returns True if bytes signature sig verifies on bytes serialization ser using .raw as verifier public key for ._verify cipher suite determined by .code Parameters: sig is bytes signature ser is bytes serialization """ return (self._verify(sig=sig, ser=ser, key=self.raw))
@staticmethod def _ed25519(sig, ser, key): """ Returns True if verified False otherwise Verify Ed25519 sig on ser using key Parameters: sig is bytes signature ser is bytes serialization key is bytes public key """ try: # verify returns None if valid else raises ValueError pysodium.crypto_sign_verify_detached(sig, ser, key) except Exception as ex: return False return True @staticmethod def _secp256r1(sig, ser, key): """ Returns True if verified False otherwise Verify secp256r1 sig on ser using key Parameters: sig is bytes signature ser is bytes serialization key is bytes public key """ verkey = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), key) r = int.from_bytes(sig[:32], "big") s = int.from_bytes(sig[32:], "big") der = utils.encode_dss_signature(r, s) try: verkey.verify(der, ser, ec.ECDSA(hashes.SHA256())) return True except exceptions.InvalidSignature: return False @staticmethod def _secp256k1(sig, ser, key): """ Returns True if verified False otherwise Verify secp256k1 sig on ser using key Parameters: sig is bytes signature ser is bytes serialization key is bytes public key """ verkey = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256K1(), key) r = int.from_bytes(sig[:32], "big") s = int.from_bytes(sig[32:], "big") der = utils.encode_dss_signature(r, s) try: verkey.verify(der, ser, ec.ECDSA(hashes.SHA256())) return True except exceptions.InvalidSignature: return False
[docs] class Cigar(Matter): """ Cigar is Matter subclass holding a nonindexed signature with verfer property. From Matter .raw is signature and .code is signature cipher suite Adds .verfer property to hold Verfer instance of associated verifier public key Verfer's .raw as verifier key and .code is verifier cipher suite. See Matter for inherited attributes and properties: Attributes: Properties: (Inherited) .code is str derivation code to indicate cypher suite .size is size (int): number of quadlets when variable sized material besides full derivation code else None .raw is bytes crypto material only without code .qb64 is str in Base64 fully qualified with derivation code + crypto mat .qb64b is bytes in Base64 fully qualified with derivation code + crypto mat .qb2 is bytes in binary with derivation code + crypto material .transferable is Boolean, True when transferable derivation code False otherwise .digestive is Boolean, True when digest derivation code False otherwise Properties: .verfer is verfer of public key used to verify signature Hidden: ._code is str value for .code property ._size is int value for .size property ._raw is bytes value for .raw property ._infil is method to compute fully qualified Base64 from .raw and .code ._exfil is method to extract .code and .raw from fully qualified Base64 Methods: Hidden: ._pad is method to compute .pad property ._code is str value for .code property ._raw is bytes value for .raw property ._index is int value for .index property ._infil is method to compute fully qualified Base64 from .raw and .code ._exfil is method to extract .code and .raw from fully qualified Base64 """
[docs] def __init__(self, verfer=None, **kwa): """ Assign verfer to ._verfer attribute """ super(Cigar, self).__init__(**kwa) self._verfer = verfer
@property def verfer(self): """ Property verfer: Returns Verfer instance Assumes ._verfer is correctly assigned """ return self._verfer @verfer.setter def verfer(self, verfer): """ verfer property setter """ self._verfer = verfer
[docs] class Signer(Matter): """ Signer is Matter subclass with method to create signature of serialization using: .raw as signing (private) key seed, .code as cipher suite for signing .verfer whose property .raw is public key for signing. If not provided .verfer is generated from private key seed using .code as cipher suite for creating key-pair. See Matter for inherited attributes and properties: Attributes: Properties: (inherited) code (str): hard part of derivation code to indicate cypher suite both (int): hard and soft parts of full text code size (int): Number of triplets of bytes including lead bytes (quadlets of chars) of variable sized material. Value of soft size, ss, part of full text code. Otherwise None. rize (int): number of bytes of raw material not including lead bytes raw (bytes): private signing key crypto material only without code qb64 (str): private signing key Base64 fully qualified with derivation code + crypto mat qb64b (bytes): private signing keyBase64 fully qualified with derivation code + crypto mat qb2 (bytes): private signing key binary with derivation code + crypto material transferable (bool): True means transferable derivation code False otherwise digestive (bool): True means digest derivation code False otherwise Properties: .verfer is Verfer object instance of public key derived from private key seed which is .raw Methods: sign: create signature """
[docs] def __init__(self, raw=None, code=MtrDex.Ed25519_Seed, transferable=True, **kwa): """ Assign signing cipher suite function to ._sign Parameters: See Matter for inherted parameters raw is bytes crypto material seed or private key code is derivation code transferable is Boolean True means make verifier code transferable False make non-transferable """ try: super(Signer, self).__init__(raw=raw, code=code, **kwa) except EmptyMaterialError as ex: if code == MtrDex.Ed25519_Seed: raw = pysodium.randombytes(pysodium.crypto_sign_SEEDBYTES) super(Signer, self).__init__(raw=raw, code=code, **kwa) elif code == MtrDex.ECDSA_256r1_Seed: raw = pysodium.randombytes(ECDSA_256r1_SEEDBYTES) super(Signer, self).__init__(raw=bytes(raw), code=code, **kwa) elif code == MtrDex.ECDSA_256k1_Seed: raw = pysodium.randombytes(ECDSA_256k1_SEEDBYTES) super(Signer, self).__init__(raw=bytes(raw), code=code, **kwa) else: raise ValueError("Unsupported signer code = {}.".format(code)) if self.code == MtrDex.Ed25519_Seed: self._sign = self._ed25519 verkey, sigkey = pysodium.crypto_sign_seed_keypair(self.raw) verfer = Verfer(raw=verkey, code=MtrDex.Ed25519 if transferable else MtrDex.Ed25519N) elif self.code == MtrDex.ECDSA_256r1_Seed: self._sign = self._secp256r1 d = int.from_bytes(self.raw, byteorder="big") sigkey = ec.derive_private_key(d, ec.SECP256R1()) verkey = sigkey.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.CompressedPoint) verfer = Verfer(raw=verkey, code=MtrDex.ECDSA_256r1 if transferable else MtrDex.ECDSA_256r1N) elif self.code == MtrDex.ECDSA_256k1_Seed: self._sign = self._secp256k1 d = int.from_bytes(self.raw, byteorder="big") sigkey = ec.derive_private_key(d, ec.SECP256K1()) verkey = sigkey.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.CompressedPoint) verfer = Verfer(raw=verkey, code=MtrDex.ECDSA_256k1 if transferable else MtrDex.ECDSA_256k1N) else: raise ValueError("Unsupported signer code = {}.".format(self.code)) self._verfer = verfer
@property def verfer(self): """ Property verfer: Returns Verfer instance Assumes ._verfer is correctly assigned """ return self._verfer
[docs] def sign(self, ser, index=None, only=False, ondex=None, **kwa): """ Returns either Cigar or Siger (indexed) instance of cryptographic signature material on bytes serialization ser If index is None return Cigar instance Else return Siger instance Parameters: ser (bytes): serialization to be signed index (int): main index of associated verifier key in event keys only (bool): True means main index only list, ondex ignored False means both index lists (default), ondex used ondex (int | None): other index offset into list such as prior next """ return (self._sign(ser=ser, seed=self.raw, verfer=self.verfer, index=index, only=only, ondex=ondex, **kwa))
@staticmethod def _ed25519(ser, seed, verfer, index, only=False, ondex=None, **kwa): """ Returns signature as either Cigar or Siger instance as appropriate for Ed25519 digital signatures given index and ondex values The seed's code determins the crypto key-pair algorithm and signing suite The signature type, Cigar or Siger, and when indexed the Siger code may be completely determined by the seed and index values (index, ondex) by assuming that the index values are intentional. Without the seed code its more difficult for Siger to determine when for the Indexer code value should be changed from the than the provided value with respect to provided but incompatible index values versus error conditions. Parameters: ser (bytes): serialization to be signed seed (bytes): raw binary seed (private key) verfer (Verfer): instance. verfer.raw is public key index (int |None): main index offset into list such as current signing None means return non-indexed Cigar Not None means return indexed Siger with Indexer code derived from index, conly, and ondex values only (bool): True means main index only list, ondex ignored False means both index lists (default), ondex used ondex (int | None): other index offset into list such as prior next """ # compute raw signature sig using seed on serialization ser sig = pysodium.crypto_sign_detached(ser, seed + verfer.raw) if index is None: # Must be Cigar i.e. non-indexed signature return Cigar(raw=sig, code=MtrDex.Ed25519_Sig, verfer=verfer) else: # Must be Siger i.e. indexed signature # should add Indexer class method to get ms main index size for given code if only: # only main index ondex not used ondex = None if index <= 63: # (64 ** ms - 1) where ms is main index size code = IdrDex.Ed25519_Crt_Sig # use small current only else: code = IdrDex.Ed25519_Big_Crt_Sig # use big current only else: # both if ondex == None: ondex = index # enable default to be same if ondex == index and index <= 63: # both same and small code = IdrDex.Ed25519_Sig # use small both same else: # otherwise big or both not same so use big both code = IdrDex.Ed25519_Big_Sig # use use big both return Siger(raw=sig, code=code, index=index, ondex=ondex, verfer=verfer,) @staticmethod def _secp256r1(ser, seed, verfer, index, only=False, ondex=None, **kwa): """ Returns signature as either Cigar or Siger instance as appropriate for Ed25519 digital signatures given index and ondex values The seed's code determins the crypto key-pair algorithm and signing suite The signature type, Cigar or Siger, and when indexed the Siger code may be completely determined by the seed and index values (index, ondex) by assuming that the index values are intentional. Without the seed code its more difficult for Siger to determine when for the Indexer code value should be changed from the than the provided value with respect to provided but incompatible index values versus error conditions. Parameters: ser (bytes): serialization to be signed seed (bytes): raw binary seed (private key) verfer (Verfer): instance. verfer.raw is public key index (int |None): main index offset into list such as current signing None means return non-indexed Cigar Not None means return indexed Siger with Indexer code derived from index, conly, and ondex values only (bool): True means main index only list, ondex ignored False means both index lists (default), ondex used ondex (int | None): other index offset into list such as prior next """ # compute raw signature sig using seed on serialization ser d = int.from_bytes(seed, byteorder="big") sigkey = ec.derive_private_key(d, ec.SECP256R1()) der = sigkey.sign(ser, ec.ECDSA(hashes.SHA256())) (r, s) = utils.decode_dss_signature(der) sig = bytearray(r.to_bytes(32, "big")) sig.extend(s.to_bytes(32, "big")) if index is None: # Must be Cigar i.e. non-indexed signature return Cigar(raw=sig, code=MtrDex.ECDSA_256r1_Sig, verfer=verfer) else: # Must be Siger i.e. indexed signature # should add Indexer class method to get ms main index size for given code if only: # only main index ondex not used ondex = None if index <= 63: # (64 ** ms - 1) where ms is main index size code = IdrDex.ECDSA_256r1_Crt_Sig # use small current only else: code = IdrDex.ECDSA_256r1_Big_Crt_Sig # use big current only else: # both if ondex == None: ondex = index # enable default to be same if ondex == index and index <= 63: # both same and small code = IdrDex.ECDSA_256r1_Sig # use small both same else: # otherwise big or both not same so use big both code = IdrDex.ECDSA_256r1_Big_Sig # use use big both return Siger(raw=sig, code=code, index=index, ondex=ondex, verfer=verfer,) @staticmethod def _secp256k1(ser, seed, verfer, index, only=False, ondex=None, **kwa): """ Returns signature as either Cigar or Siger instance as appropriate for secp256k1 digital signatures given index and ondex values The seed's code determins the crypto key-pair algorithm and signing suite The signature type, Cigar or Siger, and when indexed the Siger code may be completely determined by the seed and index values (index, ondex) by assuming that the index values are intentional. Without the seed code its more difficult for Siger to determine when for the Indexer code value should be changed from the than the provided value with respect to provided but incompatible index values versus error conditions. Parameters: ser (bytes): serialization to be signed seed (bytes): raw binary seed (private key) verfer (Verfer): instance. verfer.raw is public key index (int |None): main index offset into list such as current signing None means return non-indexed Cigar Not None means return indexed Siger with Indexer code derived from index, conly, and ondex values only (bool): True means main index only list, ondex ignored False means both index lists (default), ondex used ondex (int | None): other index offset into list such as prior next """ # compute raw signature sig using seed on serialization ser d = int.from_bytes(seed, byteorder="big") sigkey = ec.derive_private_key(d, ec.SECP256K1()) der = sigkey.sign(ser, ec.ECDSA(hashes.SHA256())) (r, s) = utils.decode_dss_signature(der) sig = bytearray(r.to_bytes(32, "big")) sig.extend(s.to_bytes(32, "big")) if index is None: # Must be Cigar i.e. non-indexed signature return Cigar(raw=sig, code=MtrDex.ECDSA_256k1_Sig, verfer=verfer) else: # Must be Siger i.e. indexed signature # should add Indexer class method to get ms main index size for given code if only: # only main index ondex not used ondex = None if index <= 63: # (64 ** ms - 1) where ms is main index size code = IdrDex.ECDSA_256k1_Crt_Sig # use small current only else: code = IdrDex.ECDSA_256k1_Big_Crt_Sig # use big current only else: # both if ondex == None: ondex = index # enable default to be same if ondex == index and index <= 63: # both same and small code = IdrDex.ECDSA_256k1_Sig # use small both same else: # otherwise big or both not same so use big both code = IdrDex.ECDSA_256k1_Big_Sig # use use big both return Siger(raw=sig, code=code, index=index, ondex=ondex, verfer=verfer,)
# def derive_index_code(code, index, only=False, ondex=None, **kwa): # # should add Indexer class method to get ms main index size for given code # if only: # only main index ondex not used # ondex = None # if index <= 63: # (64 ** ms - 1) where ms is main index size, use small current only # if code == MtrDex.Ed25519_Seed: # indxSigCode = IdrDex.Ed25519_Crt_Sig # elif code == MtrDex.ECDSA_256r1_Seed: # indxSigCode = IdrDex.ECDSA_256r1_Crt_Sig # elif code == MtrDex.ECDSA_256k1_Seed: # indxSigCode = IdrDex.ECDSA_256k1_Crt_Sig # else: # raise ValueError("Unsupported signer code = {}.".format(code)) # else: # use big current only # if code == MtrDex.Ed25519_Seed: # indxSigCode = IdrDex.Ed25519_Big_Crt_Sig # elif code == MtrDex.ECDSA_256r1_Seed: # indxSigCode = IdrDex.ECDSA_256r1_Big_Crt_Sig # elif code == MtrDex.ECDSA_256k1_Seed: # indxSigCode = IdrDex.ECDSA_256k1_Big_Crt_Sig # else: # raise ValueError("Unsupported signer code = {}.".format(code)) # else: # both # if ondex == None: # ondex = index # enable default to be same # if ondex == index and index <= 63: # both same and small so use small both same # if code == MtrDex.Ed25519_Seed: # indxSigCode = IdrDex.Ed25519_Sig # elif code == MtrDex.ECDSA_256r1_Seed: # indxSigCode = IdrDex.ECDSA_256r1_Sig # elif code == MtrDex.ECDSA_256k1_Seed: # indxSigCode = IdrDex.ECDSA_256k1_Sig # else: # raise ValueError("Unsupported signer code = {}.".format(code)) # else: # otherwise big or both not same so use big both # if code == MtrDex.Ed25519_Seed: # indxSigCode = IdrDex.Ed25519_Big_Sig # elif code == MtrDex.ECDSA_256r1_Seed: # indxSigCode = IdrDex.ECDSA_256r1_Big_Sig # elif code == MtrDex.ECDSA_256k1_Seed: # indxSigCode = IdrDex.ECDSA_256k1_Big_Sig # else: # raise ValueError("Unsupported signer code = {}.".format(code)) # return (indxSigCode, ondex)
[docs] class Salter(Matter): """ Salter is Matter subclass to maintain random salt for secrets (private keys) Its .raw is random salt, .code as cipher suite for salt Attributes: .level is str security level code. Provides default level Inherited Properties .pad is int number of pad chars given raw .code is str derivation code to indicate cypher suite .raw is bytes crypto material only without code .index is int count of attached crypto material by context (receipts) .qb64 is str in Base64 fully qualified with derivation code + crypto mat .qb64b is bytes in Base64 fully qualified with derivation code + crypto mat .qb2 is bytes in binary with derivation code + crypto material .transferable is Boolean, True when transferable derivation code False otherwise Properties: Methods: Hidden: ._pad is method to compute .pad property ._code is str value for .code property ._raw is bytes value for .raw property ._index is int value for .index property ._infil is method to compute fully qualified Base64 from .raw and .code ._exfil is method to extract .code and .raw from fully qualified Base64 """ Tier = Tiers.low
[docs] def __init__(self, raw=None, code=MtrDex.Salt_128, tier=None, **kwa): """ Initialize salter's raw and code Inherited Parameters: raw is bytes of unqualified crypto material usable for crypto operations qb64b is bytes of fully qualified crypto material qb64 is str or bytes of fully qualified crypto material qb2 is bytes of fully qualified crypto material code is str of derivation code index is int of count of attached receipts for CryCntDex codes Parameters: """ try: super(Salter, self).__init__(raw=raw, code=code, **kwa) except EmptyMaterialError as ex: if code == MtrDex.Salt_128: raw = pysodium.randombytes(pysodium.crypto_pwhash_SALTBYTES) super(Salter, self).__init__(raw=raw, code=code, **kwa) else: raise ValueError("Unsupported salter code = {}.".format(code)) if self.code not in (MtrDex.Salt_128,): raise ValueError("Unsupported salter code = {}.".format(self.code)) self.tier = tier if tier is not None else self.Tier
[docs] def stretch(self, *, size=32, path="", tier=None, temp=False): """ Returns (bytes): raw binary seed (secret) derived from path and .raw and stretched to size given by code using argon2d stretching algorithm. Parameters: size (int): number of bytes in stretched seed path (str): unique chars used in derivation of seed (secret) tier (str): value from Tierage for security level of stretch temp is Boolean, True means use quick method to stretch salt for testing only, Otherwise use time set by tier to stretch """ tier = tier if tier is not None else self.tier if temp: opslimit = 1 # pysodium.crypto_pwhash_OPSLIMIT_MIN memlimit = 8192 # pysodium.crypto_pwhash_MEMLIMIT_MIN else: if tier == Tiers.low: opslimit = 2 # pysodium.crypto_pwhash_OPSLIMIT_INTERACTIVE memlimit = 67108864 # pysodium.crypto_pwhash_MEMLIMIT_INTERACTIVE elif tier == Tiers.med: opslimit = 3 # pysodium.crypto_pwhash_OPSLIMIT_MODERATE memlimit = 268435456 # pysodium.crypto_pwhash_MEMLIMIT_MODERATE elif tier == Tiers.high: opslimit = 4 # pysodium.crypto_pwhash_OPSLIMIT_SENSITIVE memlimit = 1073741824 # pysodium.crypto_pwhash_MEMLIMIT_SENSITIVE else: raise ValueError("Unsupported security tier = {}.".format(tier)) # stretch algorithm is argon2id seed = pysodium.crypto_pwhash(outlen=size, passwd=path, salt=self.raw, opslimit=opslimit, memlimit=memlimit, alg=pysodium.crypto_pwhash_ALG_ARGON2ID13) return (seed)
[docs] def signer(self, *, code=MtrDex.Ed25519_Seed, transferable=True, path="", tier=None, temp=False): """ Returns Signer instance whose .raw secret is derived from path and salter's .raw and stretched to size given by code. The signers public key for its .verfer is derived from code and transferable. Parameters: code is str code of secret crypto suite transferable is Boolean, True means use transferace code for public key path is str of unique chars used in derivation of secret seed for signer tier is str Tierage security level temp is Boolean, True means use quick method to stretch salt for testing only, Otherwise use more time to stretch """ seed = self.stretch(size=Matter._rawSize(code), path=path, tier=tier, temp=temp) return (Signer(raw=seed, code=code, transferable=transferable))
[docs] def signers(self, count=1, start=0, path="", **kwa): """ Returns list of count number of Signer instances with unique derivation path made from path prefix and suffix of start plus offset for each count value from 0 to count - 1. See .signer for parameters used to create each signer. """ return [self.signer(path=f"{path}{i + start:x}", **kwa) for i in range(count)]
[docs] class Cipher(Matter): """ Cipher is Matter subclass holding a cipher text of a secret that may be either a secret seed (private key) or secret salt with appropriate CESR code to indicate which kind (which indicates size). The cipher text is created with assymetric encryption using an unrelated (public, private) encryption/decryption key pair. The public key is used for encryption the private key for decryption. The default is to use X25519 sealed box encryption. The Cipher instances .raw is the raw binary encrypted cipher text and its .code indicates what type of secret has been encrypted. The cipher suite used for the encryption/decryption is implied by the context where the cipher is used. See Matter for inherited attributes and properties """
[docs] def __init__(self, raw=None, code=None, **kwa): """ Parmeters: raw (Union[bytes, str]): cipher text code (str): cipher suite """ if raw is not None and code is None: if len(raw) == Matter._rawSize(MtrDex.X25519_Cipher_Salt): code = MtrDex.X25519_Cipher_Salt elif len(raw) == Matter._rawSize(MtrDex.X25519_Cipher_Seed): code = MtrDex.X25519_Cipher_Seed if hasattr(raw, "encode"): raw = raw.encode("utf-8") # ensure bytes not str super(Cipher, self).__init__(raw=raw, code=code, **kwa) if self.code not in (MtrDex.X25519_Cipher_Salt, MtrDex.X25519_Cipher_Seed): raise ValueError("Unsupported cipher code = {}.".format(self.code))
[docs] def decrypt(self, prikey=None, seed=None): """ Returns plain text as Matter instance (Signer or Salter) of cryptographic cipher text material given by .raw. Encrypted plain text is fully qualified (qb64) so derivaton code of plain text preserved through encryption/decryption round trip. Uses either decryption key given by prikey or derives prikey from signing key derived from private seed. Parameters: prikey (Union[bytes, str]): qb64b or qb64 serialization of private decryption key seed (Union[bytes, str]): qb64b or qb64 serialization of private signing key seed used to derive private decryption key """ decrypter = Decrypter(qb64b=prikey, seed=seed) return decrypter.decrypt(ser=self.qb64b)
[docs] class Encrypter(Matter): """ Encrypter is Matter subclass with method to create a cipher text of a fully qualified (qb64) private key/seed where private key/seed is the plain text. Encrypter uses assymetric (public, private) key encryption of a serialization (plain text). Using its .raw as the encrypting (public) key and its .code to indicate the cipher suite for the encryption operation. For example .code == MtrDex.X25519 indicates that X25519 sealed box encyrption is used. The encryption key may be derived from an Ed25519 signing public key that associated with a nontransferable or basic derivation self certifying identifier. This allows use of the self certifying identifier to track or manage the encryption/decryption key pair. And could be used to provide additional authentication operations for using the encryption/decryption key pair. Support for this is provided at init time with the verkey parameter which allows deriving the encryption public key from the fully qualified verkey (signature verification key). See Matter for inherited attributes and properties: Methods: encrypt: returns cipher text """
[docs] def __init__(self, raw=None, code=MtrDex.X25519, verkey=None, **kwa): """ Assign encrypting cipher suite function to ._encrypt Parameters: See Matter for inherted parameters such as qb64, qb64b raw (bytes): public encryption key qb64b (bytes): fully qualified public encryption key qb64 (str): fully qualified public encryption key code (str): derivation code for public encryption key verkey (Union[bytes, str]): qb64b or qb64 of verkey used to derive raw """ if not raw and verkey: verfer = Verfer(qb64b=verkey) if verfer.code not in (MtrDex.Ed25519N, MtrDex.Ed25519): raise ValueError("Unsupported verkey derivation code = {}." "".format(verfer.code)) # convert signing public key to encryption public key raw = pysodium.crypto_sign_pk_to_box_pk(verfer.raw) super(Encrypter, self).__init__(raw=raw, code=code, **kwa) if self.code == MtrDex.X25519: self._encrypt = self._x25519 else: raise ValueError("Unsupported encrypter code = {}.".format(self.code))
[docs] def verifySeed(self, seed): """ Returns: Boolean: True means private signing key seed corresponds to public signing key verkey used to derive encrypter's .raw public encryption key. Parameters: seed (Union(bytes,str)): qb64b or qb64 serialization of private signing key seed """ signer = Signer(qb64b=seed) verkey, sigkey = pysodium.crypto_sign_seed_keypair(signer.raw) pubkey = pysodium.crypto_sign_pk_to_box_pk(verkey) return (pubkey == self.raw)
[docs] def encrypt(self, ser=None, matter=None): """ Returns: Cipher instance of cipher text encryption of plain text serialization provided by either ser or Matter instance when provided. Parameters: ser (Union[bytes,str]): qb64b or qb64 serialization of plain text matter (Matter): plain text as Matter instance of seed or salt to be encrypted """ if not (ser or matter): raise EmptyMaterialError("Neither ser or plain are provided.") if ser: matter = Matter(qb64b=ser) if matter.code == MtrDex.Salt_128: # future other salt codes code = MtrDex.X25519_Cipher_Salt elif matter.code == MtrDex.Ed25519_Seed: # future other seed codes code = MtrDex.X25519_Cipher_Seed else: raise ValueError("Unsupported plain text code = {}.".format(matter.code)) # encrypting fully qualified qb64 version of plain text ensures its # derivation code round trips through eventual decryption return (self._encrypt(ser=matter.qb64b, pubkey=self.raw, code=code))
@staticmethod def _x25519(ser, pubkey, code): """ Returns cipher text as Cipher instance Parameters: ser (Union[bytes, str]): qb64b or qb64 serialization of seed or salt to be encrypted. pubkey (bytes): raw binary serialization of encryption public key code (str): derivation code of serialized plain text seed or salt """ raw = pysodium.crypto_box_seal(ser, pubkey) return Cipher(raw=raw, code=code)
[docs] class Decrypter(Matter): """ Decrypter is Matter subclass with method to decrypt the plain text from a ciper text of a fully qualified (qb64) private key/seed where private key/seed is the plain text. Decrypter uses assymetric (public, private) key decryption of the cipher text using its .raw as the decrypting (private) key and its .code to indicate the cipher suite for the decryption operation. For example .code == MtrDex.X25519 indicates that X25519 sealed box decyrption is used. The decryption key may be derived from an Ed25519 signing private key that is associated with a nontransferable or basic derivation self certifying identifier. This allows use of the self certifying identifier to track or manage the encryption/decryption key pair. And could be used to provide additional authentication operations for using the encryption/decryption key pair. Support for this is provided at init time with the sigkey parameter which allows deriving the decryption private key from the fully qualified sigkey (signing key). See Matter for inherited attributes and properties: Attributes: Properties: Methods: decrypt: create cipher text """
[docs] def __init__(self, code=MtrDex.X25519_Private, seed=None, **kwa): """ Assign decrypting cipher suite function to ._decrypt Parameters: See Matter for inheirted parameters raw (bytes): private decryption key derived from seed (private signing key) qb64b (bytes): fully qualified private decryption key qb64 (str): fully qualified private decryption key code (str): derivation code for private decryption key seed (Union[bytes, str]): qb64b or qb64 of signing key seed used to derive raw which is private decryption key """ try: super(Decrypter, self).__init__(code=code, **kwa) except EmptyMaterialError as ex: if seed: signer = Signer(qb64b=seed) if signer.code not in (MtrDex.Ed25519_Seed,): raise ValueError("Unsupported signing seed derivation code = {}." "".format(signer.code)) # verkey, sigkey = pysodium.crypto_sign_seed_keypair(signer.raw) sigkey = signer.raw + signer.verfer.raw # sigkey is raw seed + raw verkey raw = pysodium.crypto_sign_sk_to_box_sk(sigkey) # raw private encrypt key super(Decrypter, self).__init__(raw=raw, code=code, **kwa) else: raise if self.code == MtrDex.X25519_Private: self._decrypt = self._x25519 else: raise ValueError("Unsupported decrypter code = {}.".format(self.code))
[docs] def decrypt(self, ser=None, cipher=None, transferable=False): """ Returns: Salter or Signer instance derived from plain text decrypted from encrypted cipher text material given by ser or cipher. Plain text that is orignally encrypt should always be fully qualified (qb64b) so that derivaton code of plain text is preserved through encryption/decryption round trip. Parameters: ser (Union[bytes,str]): qb64b or qb64 serialization of cipher text cipher (Cipher): optional Cipher instance when ser is None transferable (bool): True means associated verfer of returned signer is transferable. False means non-transferable """ if not (ser or cipher): raise EmptyMaterialError("Neither ser or cipher are provided.") if ser: # create cipher to ensure valid derivation code of material in ser cipher = Cipher(qb64b=ser) return (self._decrypt(cipher=cipher, prikey=self.raw, transferable=transferable))
@staticmethod def _x25519(cipher, prikey, transferable=False): """ Returns plain text as Salter or Signer instance depending on the cipher code and the embedded encrypted plain text derivation code. Parameters: cipher (Cipher): instance of encrypted seed or salt prikey (bytes): raw binary decryption private key derived from signing seed or sigkey transferable (bool): True means associated verfer of returned signer is transferable. False means non-transferable """ pubkey = pysodium.crypto_scalarmult_curve25519_base(prikey) plain = pysodium.crypto_box_seal_open(cipher.raw, pubkey, prikey) # qb64b # ensure raw plain text is qb64b or qb64 so its derivation code is round tripped if cipher.code == MtrDex.X25519_Cipher_Salt: return Salter(qb64b=plain) elif cipher.code == MtrDex.X25519_Cipher_Seed: return Signer(qb64b=plain, transferable=transferable) else: raise ValueError("Unsupported cipher text code = {}.".format(cipher.code))
[docs] class Diger(Matter): """ Diger is Matter subclass with method to verify digest of serialization See Matter for inherited attributes and properties: Methods: verify: verifies digest given ser compare: compares provide digest given ser to this digest of ser. enables digest agility of different digest algos to compare. """
[docs] def __init__(self, raw=None, ser=None, code=MtrDex.Blake3_256, **kwa): """ Assign digest verification function to ._verify See Matter for inherited parameters Inherited Parameters: raw is bytes of unqualified crypto material usable for crypto operations qb64b is bytes of fully qualified crypto material qb64 is str or bytes of fully qualified crypto material qb2 is bytes of fully qualified crypto material code is str of derivation code index is int of count of attached receipts for CryCntDex codes Parameters: ser is bytes serialization from which raw is computed if not raw """ # Should implement all digests in DigCodex instance DigDex try: super(Diger, self).__init__(raw=raw, code=code, **kwa) except EmptyMaterialError as ex: if not ser: raise ex if code == MtrDex.Blake3_256: dig = blake3.blake3(ser).digest() elif code == MtrDex.Blake2b_256: dig = hashlib.blake2b(ser, digest_size=32).digest() elif code == MtrDex.Blake2s_256: dig = hashlib.blake2s(ser, digest_size=32).digest() elif code == MtrDex.SHA3_256: dig = hashlib.sha3_256(ser).digest() elif code == MtrDex.SHA2_256: dig = hashlib.sha256(ser).digest() else: raise InvalidValueError("Unsupported code={code} for diger.") super(Diger, self).__init__(raw=dig, code=code, **kwa) if self.code == MtrDex.Blake3_256: self._verify = self._blake3_256 elif self.code == MtrDex.Blake2b_256: self._verify = self._blake2b_256 elif self.code == MtrDex.Blake2s_256: self._verify = self._blake2s_256 elif self.code == MtrDex.SHA3_256: self._verify = self._sha3_256 elif self.code == MtrDex.SHA2_256: self._verify = self._sha2_256 else: raise InvalidValueError("Unsupported code={self.code} for diger.")
[docs] def verify(self, ser): """ Returns True if raw digest of ser bytes (serialization) matches .raw using .raw as reference digest for ._verify digest algorithm determined by .code Parameters: ser (bytes): serialization to be digested and compared to .ser """ return (self._verify(ser=ser, raw=self.raw))
[docs] def compare(self, ser, dig=None, diger=None): """ Returns True if dig and .qb64 or .qb64b match or if both .raw and dig are valid digests of ser Otherwise returns False Parameters: ser is bytes serialization dig is qb64b or qb64 digest of ser to compare with self diger is Diger instance of digest of ser to compare with self if both supplied dig takes precedence If both match then as optimization returns True and does not verify either as digest of ser Else If both have same code but do not match then as optimization returns False and does not verify if either is digest of ser Else recalcs both digests using each one's code to verify they they are both digests of ser regardless of matching codes. """ if dig is not None: if hasattr(dig, "encode"): dig = dig.encode('utf-8') # makes bytes if dig == self.qb64b: # matching return True diger = Diger(qb64b=dig) # extract code elif diger is not None: if diger.qb64b == self.qb64b: return True else: raise ValueError("Both dig and diger may not be None.") if diger.code == self.code: # digest not match but same code return False if diger.verify(ser=ser) and self.verify(ser=ser): # both verify on ser return True return (False)
@staticmethod def _blake3_256(ser, raw): """ Returns True if verified False otherwise Verifiy blake3_256 digest of ser matches raw Parameters: ser is bytes serialization dig is bytes reference digest """ return (blake3.blake3(ser).digest() == raw) @staticmethod def _blake2b_256(ser, raw): """ Returns True if verified False otherwise Verifiy blake2b_256 digest of ser matches raw Parameters: ser is bytes serialization dig is bytes reference digest """ return (hashlib.blake2b(ser, digest_size=32).digest() == raw) @staticmethod def _blake2s_256(ser, raw): """ Returns True if verified False otherwise Verifiy blake2s_256 digest of ser matches raw Parameters: ser is bytes serialization dig is bytes reference digest """ return (hashlib.blake2s(ser, digest_size=32).digest() == raw) @staticmethod def _sha3_256(ser, raw): """ Returns True if verified False otherwise Verifiy blake2s_256 digest of ser matches raw Parameters: ser is bytes serialization dig is bytes reference digest """ return (hashlib.sha3_256(ser).digest() == raw) @staticmethod def _sha2_256(ser, raw): """ Returns True if verified False otherwise Verifiy blake2s_256 digest of ser matches raw Parameters: ser is bytes serialization dig is bytes reference digest """ return (hashlib.sha256(ser).digest() == raw)
[docs] @dataclass(frozen=True) class PreCodex: """ PreCodex is codex all identifier prefix derivation codes. This is needed to verify valid inception events. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Ed25519N: str = 'B' # Ed25519 verification key non-transferable, basic derivation. Ed25519: str = 'D' # Ed25519 verification key basic derivation Blake3_256: str = 'E' # Blake3 256 bit digest self-addressing derivation. Blake2b_256: str = 'F' # Blake2b 256 bit digest self-addressing derivation. Blake2s_256: str = 'G' # Blake2s 256 bit digest self-addressing derivation. SHA3_256: str = 'H' # SHA3 256 bit digest self-addressing derivation. SHA2_256: str = 'I' # SHA2 256 bit digest self-addressing derivation. Blake3_512: str = '0D' # Blake3 512 bit digest self-addressing derivation. Blake2b_512: str = '0E' # Blake2b 512 bit digest self-addressing derivation. SHA3_512: str = '0F' # SHA3 512 bit digest self-addressing derivation. SHA2_512: str = '0G' # SHA2 512 bit digest self-addressing derivation. ECDSA_256k1N: str = '1AAA' # ECDSA secp256k1 verification key non-transferable, basic derivation. ECDSA_256k1: str = '1AAB' # ECDSA public verification or encryption key, basic derivation ECDSA_256r1N: str = "1AAI" # ECDSA secp256r1 verification key non-transferable, basic derivation. ECDSA_256r1: str = "1AAJ" # ECDSA secp256r1 verification or encryption key, basic derivation def __iter__(self): return iter(astuple(self))
PreDex = PreCodex() # Make instance
[docs] class Prefixer(Matter): """ Prefixer is Matter subclass for autonomic identifier prefix using derivation as determined by code from ked Attributes: Inherited Properties: (see Matter) .pad is int number of pad chars given raw .code is str derivation code to indicate cypher suite .raw is bytes crypto material only without code .index is int count of attached crypto material by context (receipts) .qb64 is str in Base64 fully qualified with derivation code + crypto mat .qb64b is bytes in Base64 fully qualified with derivation code + crypto mat .qb2 is bytes in binary with derivation code + crypto material .transferable is Boolean, True when transferable derivation code False otherwise Properties: Methods: verify(): Verifies derivation of aid prefix from a ked Hidden: ._pad is method to compute .pad property ._code is str value for .code property ._raw is bytes value for .raw property ._index is int value for .index property ._infil is method to compute fully qualified Base64 from .raw and .code ._exfil is method to extract .code and .raw from fully qualified Base64 """ Dummy = "#" # dummy spaceholder char for pre. Must not be a valid Base64 char
[docs] def __init__(self, raw=None, code=None, ked=None, allows=None, **kwa): """ assign ._derive to derive aid prefix from ked assign ._verify to verify derivation of aid prefix from ked Default code is None to force EmptyMaterialError when only raw provided but not code. Inherited Parameters: raw is bytes of unqualified crypto material usable for crypto operations qb64b is bytes of fully qualified crypto material qb64 is str or bytes of fully qualified crypto material qb2 is bytes of fully qualified crypto material code is str of derivation code index is int of count of attached receipts for CryCntDex codes Parameters: allows (list): allowed codes for prefix. When None then all supported codes are allowed. This enables a particular use case to restrict the codes allowed to a subset of all supported. """ try: super(Prefixer, self).__init__(raw=raw, code=code, **kwa) except EmptyMaterialError as ex: if not ked or (not code and "i" not in ked): raise ex if not code: # get code from pre in ked super(Prefixer, self).__init__(qb64=ked["i"], code=code, **kwa) code = self.code if allows is not None and code not in allows: raise ValueError("Unallowed code={} for prefixer.".format(code)) if code in [MtrDex.Ed25519N, MtrDex.ECDSA_256r1N, MtrDex.ECDSA_256k1N]: self._derive = self._derive_non_transferable elif code in [MtrDex.Ed25519, MtrDex.ECDSA_256r1, MtrDex.ECDSA_256k1]: self._derive = self._derive_transferable elif code == MtrDex.Blake3_256: self._derive = self._derive_blake3_256 else: raise ValueError("Unsupported code = {} for prefixer.".format(code)) # use ked and ._derive from code to derive aid prefix and code raw, code = self.derive(ked=ked) super(Prefixer, self).__init__(raw=raw, code=code, **kwa) if self.code in [MtrDex.Ed25519N, MtrDex.ECDSA_256r1N, MtrDex.ECDSA_256k1N]: self._verify = self._verify_non_transferable elif self.code in [MtrDex.Ed25519, MtrDex.ECDSA_256r1, MtrDex.ECDSA_256k1]: self._verify = self._verify_transferable elif self.code == MtrDex.Blake3_256: self._verify = self._verify_blake3_256 else: raise ValueError("Unsupported code = {} for prefixer.".format(self.code))
[docs] def derive(self, ked): """ Returns tuple (raw, code) of aid prefix as derived from key event dict ked. uses a derivation code specific _derive method Parameters: ked is inception key event dict seed is only used for sig derivation it is the secret key/secret """ ilk = ked["t"] if ilk not in (Ilks.icp, Ilks.dip, Ilks.vcp, Ilks.iss): raise ValueError("Nonincepting ilk={} for prefix derivation.".format(ilk)) labels = getattr(Labels, ilk) for k in labels: if k not in ked: raise ValidationError("Missing element = {} from {} event for " "evt = {}.".format(k, ilk, ked)) return (self._derive(ked=ked))
[docs] def verify(self, ked, prefixed=False): """ Returns True if derivation from ked for .code matches .qb64 and If prefixed also verifies ked["i"] matches .qb64 False otherwise Parameters: ked is inception key event dict """ ilk = ked["t"] if ilk not in (Ilks.icp, Ilks.dip, Ilks.vcp, Ilks.iss): raise ValueError("Nonincepting ilk={} for prefix derivation.".format(ilk)) labels = getattr(Labels, ilk) for k in labels: if k not in ked: raise ValidationError("Missing element = {} from {} event for " "evt = {}.".format(k, ilk, ked)) return (self._verify(ked=ked, pre=self.qb64, prefixed=prefixed))
def _derive_non_transferable(self, ked): """ Returns tuple (raw, code) of basic nontransferable Ed25519 prefix (qb64) as derived from inception key event dict ked keys[0] """ ked = dict(ked) # make copy so don't clobber original ked try: keys = ked["k"] if len(keys) != 1: raise DerivationError("Basic derivation needs at most 1 key " " got {} keys instead".format(len(keys))) verfer = Verfer(qb64=keys[0]) except Exception as ex: raise DerivationError("Error extracting public key =" " = {}".format(ex)) if verfer.code not in [MtrDex.Ed25519N, MtrDex.ECDSA_256r1N, MtrDex.ECDSA_256k1N]: raise DerivationError("Mismatch derivation code = {}." "".format(verfer.code)) try: if verfer.code in [MtrDex.Ed25519N, MtrDex.ECDSA_256r1N, MtrDex.ECDSA_256k1N] and ked["n"]: raise DerivationError("Non-empty nxt = {} for non-transferable" " code = {}".format(ked["n"], verfer.code)) if verfer.code in [MtrDex.Ed25519N, MtrDex.ECDSA_256r1N, MtrDex.ECDSA_256k1N] and "b" in ked and ked["b"]: raise DerivationError("Non-empty b = {} for non-transferable" " code = {}".format(ked["b"], verfer.code)) if verfer.code in [MtrDex.Ed25519N, MtrDex.ECDSA_256r1N, MtrDex.ECDSA_256k1N] and "a" in ked and ked["a"]: raise DerivationError("Non-empty a = {} for non-transferable" " code = {}".format(ked["a"], verfer.code)) except Exception as ex: raise DerivationError("Error checking nxt = {}".format(ex)) return (verfer.raw, verfer.code) def _verify_non_transferable(self, ked, pre, prefixed=False): """ Returns True if verified False otherwise Verify derivation of fully qualified Base64 pre from inception iked dict Parameters: ked is inception key event dict pre is Base64 fully qualified prefix default to .qb64 """ try: keys = ked["k"] if len(keys) != 1: return False if keys[0] != pre: return False if prefixed and ked["i"] != pre: return False if ked["n"]: # must be empty return False except Exception as ex: return False return True def _derive_transferable(self, ked): """ Returns tuple (raw, code) of basic Ed25519 prefix (qb64) as derived from inception key event dict ked keys[0] """ ked = dict(ked) # make copy so don't clobber original ked try: keys = ked["k"] if len(keys) != 1: raise DerivationError("Basic derivation needs at most 1 key " " got {} keys instead".format(len(keys))) verfer = Verfer(qb64=keys[0]) except Exception as ex: raise DerivationError("Error extracting public key =" " = {}".format(ex)) if verfer.code not in [MtrDex.Ed25519, MtrDex.ECDSA_256r1, MtrDex.ECDSA_256k1]: raise DerivationError("Mismatch derivation code = {}" "".format(verfer.code)) return (verfer.raw, verfer.code) def _verify_transferable(self, ked, pre, prefixed=False): """ Returns True if verified False otherwise Verify derivation of fully qualified Base64 prefix from inception key event dict (ked) Parameters: ked is inception key event dict pre is Base64 fully qualified prefix default to .qb64 """ try: keys = ked["k"] if len(keys) != 1: return False if keys[0] != pre: return False if prefixed and ked["i"] != pre: return False except Exception as ex: return False return True def _derive_blake3_256(self, ked): """ Returns tuple (raw, code) of pre (qb64) as blake3 digest as derived from inception key event dict ked """ ked = dict(ked) # make copy so don't clobber original ked ilk = ked["t"] if ilk not in (Ilks.icp, Ilks.dip, Ilks.vcp, Ilks.iss): raise DerivationError("Invalid ilk = {} to derive pre.".format(ilk)) # put in dummy pre to get size correct ked["i"] = self.Dummy * Matter.Sizes[MtrDex.Blake3_256].fs ked["d"] = ked["i"] # must be same dummy #raw, proto, kind, ked, version = sizeify(ked=ked) raw, _, _, _, _ = sizeify(ked=ked) dig = blake3.blake3(raw).digest() # digest with dummy 'i' and 'd' return (dig, MtrDex.Blake3_256) # dig is derived correct new 'i' and 'd' def _verify_blake3_256(self, ked, pre, prefixed=False): """ Returns True if verified False otherwise Verify derivation of fully qualified Base64 prefix from inception key event dict (ked) Parameters: ked is inception key event dict pre is Base64 fully qualified default to .qb64 """ try: raw, code = self._derive_blake3_256(ked=ked) # replace with dummy 'i' crymat = Matter(raw=raw, code=MtrDex.Blake3_256) if crymat.qb64 != pre: # derived raw with dummy 'i' must match pre return False if prefixed and ked["i"] != pre: # incoming 'i' must match pre return False if ked["i"] != ked["d"]: # when digestive then SAID must match pre return False except Exception as ex: return False return True
# digest algorithm klas, digest size (not default), digest length # size and length are needed for some digest types as function parameters Digestage = namedtuple("Digestage", "klas size length")
[docs] class Saider(Matter): """ Saider is Matter subclass for self-addressing identifier prefix using derivation as determined by code from ked Properties: (inherited) code (str): derivation code to indicate cypher suite size (int): number of quadlets when variable sized material besides full derivation code else None raw (bytes): crypto material only without code qb64 (str): Base64 fully qualified with derivation code + crypto mat qb64b (bytes): Base64 fully qualified with derivation code + crypto mat qb2 (bytes): binary with derivation code + crypto material transferable (bool): True means transferable derivation code False otherwise digestive (bool): True means digest derivation code False otherwise Hidden: _code (str): value for .code property _size (int): value for .size property _raw (bytes): value for .raw property _infil (types.MethodType): creates qb64b from .raw and .code (fully qualified Base64) _exfil (types.MethodType): extracts .code and .raw from qb64b (fully qualified Base64) _derive (types.MethodType): derives said (.qb64 ) _verify (types.MethodType): verifies said ((.qb64 ) against a given sad """ Dummy = "#" # dummy spaceholder char for said. Must not be a valid Base64 char # should be same set of codes as in coring.DigestCodex coring.DigDex so # .digestive property works. Unit test ensures code sets match Digests = { MtrDex.Blake3_256: Digestage(klas=blake3.blake3, size=None, length=None), MtrDex.Blake2b_256: Digestage(klas=hashlib.blake2b, size=32, length=None), MtrDex.Blake2s_256: Digestage(klas=hashlib.blake2s, size=None, length=None), MtrDex.SHA3_256: Digestage(klas=hashlib.sha3_256, size=None, length=None), MtrDex.SHA2_256: Digestage(klas=hashlib.sha256, size=None, length=None), MtrDex.Blake3_512: Digestage(klas=blake3.blake3, size=None, length=64), MtrDex.Blake2b_512: Digestage(klas=hashlib.blake2b, size=None, length=None), MtrDex.SHA3_512: Digestage(klas=hashlib.sha3_512, size=None, length=None), MtrDex.SHA2_512: Digestage(klas=hashlib.sha512, size=None, length=None), }
[docs] def __init__(self, raw=None, *, code=None, sad=None, kind=None, label=Saids.d, ignore=None, **kwa): """ See Matter.__init__ for inherited parameters Parameters: sad (dict): self addressed data to serialize and inject said kind (str): serialization algorithm of sad, one of Serials used to override that given by 'v' field if any in sad otherwise default is Serials.json label (str): Saidage value as said field label ignore (list): fields to ignore when generating SAID """ try: # when raw and code are both provided super(Saider, self).__init__(raw=raw, code=code, **kwa) except EmptyMaterialError as ex: # raw or code missing if not sad or label not in sad: raise ex # need sad with label field to calculate raw or qb64 if not code: if sad[label]: # no code but sad[label] not empty # attempt to get code from said in sad super(Saider, self).__init__(qb64=sad[label], code=code, **kwa) code = self._code else: # use default code code = MtrDex.Blake3_256 if code not in DigDex: # need valid code raise ValueError("Unsupported digest code = {}.".format(code)) # make copy of sad to derive said raw bytes and new sad # need new sad because sets sad[label] and sad['v'] fields raw, sad = self.derive(sad=dict(sad), code=code, kind=kind, label=label, ignore=ignore) super(Saider, self).__init__(raw=raw, code=code, **kwa) if not self.digestive: raise ValueError("Unsupported digest code = {}." "".format(self.code))
@classmethod def _serialize(clas, sad: dict, kind: str = None): """ Serialize sad with serialization kind if provided else use use embedded 'v', version string if provided else use default Serials.json Returns: ser (bytes): raw serialization of sad Parameters: sad (dict): serializable dict kind (str): serialization algorithm of sad, one of Serials used to override that given by 'v' field if any in sad otherwise default is Serials.json """ knd = Serials.json if 'v' in sad: # versioned sad _, _, knd, _ = deversify(sad['v']) if not kind: # match logic of Serder for kind kind = knd return dumps(sad, kind=kind)
[docs] @classmethod def saidify(clas, sad: dict, *, code: str = MtrDex.Blake3_256, kind: str = None, label: str = Saids.d, ignore: list = None, **kwa): """ Derives said from sad and injects it into copy of sad and said and injected sad Returns: result (tuple): of the form (saider, sad) where saider is Saider instance generated from sad using code and sad is copy of parameter sad but with its label id field filled in with generated said from saider Parameters: sad (dict): serializable dict code (str): digest type code from DigDex kind (str): serialization algorithm of sad, one of Serials used to override that given by 'v' field if any in sad otherwise default is Serials.json label (str): Saidage value as said field label in which to inject said ignore (list): fields to ignore when generating SAID """ if label not in sad: raise KeyError("Missing id field labeled={} in sad.".format(label)) raw, sad = clas._derive(sad=sad, code=code, kind=kind, label=label, ignore=ignore) saider = clas(raw=raw, code=code, kind=kind, label=label, ignore=ignore, **kwa) sad[label] = saider.qb64 return saider, sad
@classmethod def _derive(clas, sad: dict, *, code: str = MtrDex.Blake3_256, kind: str = None, label: str = Saids.d, ignore: list = None): """ Derives raw said from sad with .Dummy filled sad[label] Returns: raw (bytes): raw said from sad with dummy filled label id field Parameters: sad (dict): self addressed data to be injected with dummy and serialized code (str): digest type code from DigDex kind (str): serialization algorithm of sad, one of Serials used to override that given by 'v' field if any in sad otherwise default is Serials.json label (str): Saidage value as said field label in which to inject dummy ignore (list): fields to ignore when generating SAID """ if code not in DigDex or code not in clas.Digests: raise ValueError("Unsupported digest code = {}.".format(code)) sad = dict(sad) # make shallow copy so don't clobber original sad # fill id field denoted by label with dummy chars to get size correct sad[label] = clas.Dummy * Matter.Sizes[code].fs if 'v' in sad: # if versioned then need to set size in version string raw, proto, kind, sad, version = sizeify(ked=sad, kind=kind) ser = dict(sad) if ignore: for f in ignore: del ser[f] # string now has # correct size klas, size, length = clas.Digests[code] # sad as 'v' verision string then use its kind otherwise passed in kind cpa = [clas._serialize(ser, kind=kind)] # raw pos arg class ckwa = dict() # class keyword args if size: ckwa.update(digest_size=size) # optional digest_size dkwa = dict() # digest keyword args if length: dkwa.update(length=length) return klas(*cpa, **ckwa).digest(**dkwa), sad # raw digest and sad
[docs] def derive(self, sad, code=None, **kwa): """ Returns: result (tuple): (raw, sad) raw said as derived from serialized dict and modified sad during derivation. Parameters: sad (dict): self addressed data to be serialized code (str): digest type code from DigDex. kind (str): serialization algorithm of sad, one of Serials used to override that given by 'v' field if any in sad otherwise default is Serials.json label (str): Saidage value of said field labelin which to inject dummy """ code = code if code is not None else self.code return self._derive(sad=sad, code=code, **kwa)
[docs] def verify(self, sad, *, prefixed=False, versioned=True, code=None, kind=None, label=Saids.d, ignore=None, **kwa): """ Returns: result (bool): True means derivation from sad with dummy label field value replacement for ._code matches .qb64. False otherwise If prefixed is True then also validates that label field of provided sad also matches .qb64. False otherwise If versioned is True and provided sad includes version field 'v' then also validates that version field 'v' of provided sad matches the version field of modified sad that results from the derivation process. The size chars in the version field are set to the size of the sad during derivation. False otherwise. Parameters: sad (dict): self addressed data to be serialized prefixed (bool): True means also verify if labeled field in sad matches own .qb64 versioned (bool): code (str): digest type code from DigDex. kind (str): serialization algorithm of sad, one of Serials used to override that given by 'v' field if any in sad otherwise default is Serials.json label (str): Saidage value of said field label in which to inject dummy ignore (list): fields to ignore when generating SAID """ try: # override ensure code is self.code raw, dsad = self._derive(sad=sad, code=self.code, kind=kind, label=label, ignore=ignore) saider = Saider(raw=raw, code=self.code, ignore=ignore, **kwa) if self.qb64b != saider.qb64b: return False # not match .qb64b if 'v' in sad and versioned: if sad['v'] != dsad['v']: return False # version fields not match if prefixed and sad[label] != self.qb64: # check label field return False # label id field not match .qb64 except Exception as ex: return False return True
[docs] @dataclass(frozen=True) class IndexerCodex: """ IndexerCodex is codex hard (stable) part of all indexer derivation codes. Codes indicate which list of keys, current and/or prior next, index is for: _Sig: Indices in code may appear in both current signing and prior next key lists when event has both current and prior next key lists. Two character code table has only one index so must be the same for both lists. Other index if for prior next. The indices may be different in those code tables which have two sets of indices. _Crt_Sig: Index in code for current signing key list only. _Big_: Big index values Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Ed25519_Sig: str = 'A' # Ed25519 sig appears same in both lists if any. Ed25519_Crt_Sig: str = 'B' # Ed25519 sig appears in current list only. ECDSA_256k1_Sig: str = 'C' # ECDSA secp256k1 sig appears same in both lists if any. ECDSA_256k1_Crt_Sig: str = 'D' # ECDSA secp256k1 sig appears in current list. ECDSA_256r1_Sig: str = "E" # ECDSA secp256r1 sig appears same in both lists if any. ECDSA_256r1_Crt_Sig: str = "F" # ECDSA secp256r1 sig appears in current list. Ed448_Sig: str = '0A' # Ed448 signature appears in both lists. Ed448_Crt_Sig: str = '0B' # Ed448 signature appears in current list only. Ed25519_Big_Sig: str = '2A' # Ed25519 sig appears in both lists. Ed25519_Big_Crt_Sig: str = '2B' # Ed25519 sig appears in current list only. ECDSA_256k1_Big_Sig: str = '2C' # ECDSA secp256k1 sig appears in both lists. ECDSA_256k1_Big_Crt_Sig: str = '2D' # ECDSA secp256k1 sig appears in current list only. ECDSA_256r1_Big_Sig: str = "2E" # ECDSA secp256r1 sig appears in both lists. ECDSA_256r1_Big_Crt_Sig: str = "2F" # ECDSA secp256r1 sig appears in current list only. Ed448_Big_Sig: str = '3A' # Ed448 signature appears in both lists. Ed448_Big_Crt_Sig: str = '3B' # Ed448 signature appears in current list only. TBD0: str = '0z' # Test of Var len label L=N*4 <= 4095 char quadlets includes code TBD1: str = '1z' # Test of index sig lead 1 TBD4: str = '4z' # Test of index sig lead 1 big def __iter__(self): return iter(astuple(self)) # enables inclusion test with "in"
IdrDex = IndexerCodex()
[docs] @dataclass(frozen=True) class IndexedSigCodex: """IndexedSigCodex is codex all indexed signature derivation codes. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Ed25519_Sig: str = 'A' # Ed25519 sig appears same in both lists if any. Ed25519_Crt_Sig: str = 'B' # Ed25519 sig appears in current list only. ECDSA_256k1_Sig: str = 'C' # ECDSA secp256k1 sig appears same in both lists if any. ECDSA_256k1_Crt_Sig: str = 'D' # ECDSA secp256k1 sig appears in current list. ECDSA_256r1_Sig: str = "E" # ECDSA secp256r1 sig appears same in both lists if any. ECDSA_256r1_Crt_Sig: str = "F" # ECDSA secp256r1 sig appears in current list. Ed448_Sig: str = '0A' # Ed448 signature appears in both lists. Ed448_Crt_Sig: str = '0B' # Ed448 signature appears in current list only. Ed25519_Big_Sig: str = '2A' # Ed25519 sig appears in both lists. Ed25519_Big_Crt_Sig: str = '2B' # Ed25519 sig appears in current list only. ECDSA_256k1_Big_Sig: str = '2C' # ECDSA secp256k1 sig appears in both lists. ECDSA_256k1_Big_Crt_Sig: str = '2D' # ECDSA secp256k1 sig appears in current list only. ECDSA_256r1_Big_Sig: str = "2E" # ECDSA secp256r1 sig appears in both lists. ECDSA_256r1_Big_Crt_Sig: str = "2F" # ECDSA secp256r1 sig appears in current list only. Ed448_Big_Sig: str = '3A' # Ed448 signature appears in both lists. Ed448_Big_Crt_Sig: str = '3B' # Ed448 signature appears in current list only. def __iter__(self): return iter(astuple(self))
IdxSigDex = IndexedSigCodex() # Make instance
[docs] @dataclass(frozen=True) class IndexedCurrentSigCodex: """IndexedCurrentSigCodex is codex indexed signature codes for current list. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Ed25519_Crt_Sig: str = 'B' # Ed25519 sig appears in current list only. ECDSA_256k1_Crt_Sig: str = 'D' # ECDSA secp256k1 sig appears in current list only. ECDSA_256r1_Crt_Sig: str = "F" # ECDSA secp256r1 sig appears in current list. Ed448_Crt_Sig: str = '0B' # Ed448 signature appears in current list only. Ed25519_Big_Crt_Sig: str = '2B' # Ed25519 sig appears in current list only. ECDSA_256k1_Big_Crt_Sig: str = '2D' # ECDSA secp256k1 sig appears in current list only. ECDSA_256r1_Big_Crt_Sig: str = "2F" # ECDSA secp256r1 sig appears in current list only. Ed448_Big_Crt_Sig: str = '3B' # Ed448 signature appears in current list only. def __iter__(self): return iter(astuple(self))
IdxCrtSigDex = IndexedCurrentSigCodex() # Make instance
[docs] @dataclass(frozen=True) class IndexedBothSigCodex: """IndexedBothSigCodex is codex indexed signature codes for both lists. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ Ed25519_Sig: str = 'A' # Ed25519 sig appears same in both lists if any. ECDSA_256k1_Sig: str = 'C' # ECDSA secp256k1 sig appears same in both lists if any. ECDSA_256r1_Sig: str = "E" # ECDSA secp256r1 sig appears same in both lists if any. Ed448_Sig: str = '0A' # Ed448 signature appears in both lists. Ed25519_Big_Sig: str = '2A' # Ed25519 sig appears in both listsy. ECDSA_256k1_Big_Sig: str = '2C' # ECDSA secp256k1 sig appears in both lists. ECDSA_256r1_Big_Sig: str = "2E" # ECDSA secp256r1 sig appears in both lists. Ed448_Big_Sig: str = '3A' # Ed448 signature appears in both lists. def __iter__(self): return iter(astuple(self))
IdxBthSigDex = IndexedBothSigCodex() # Make instance # namedtuple for size entries in Incexer derivation code tables # hs is the hard size int number of chars in hard (stable) part of code # ss is the soft size int number of chars in soft (unstable) part of code # os is the other size int number of chars in other index part of soft # ms = ss - os main index size computed # fs is the full size int number of chars in code plus appended material if any # ls is the lead size int number of bytes to pre-pad pre-converted raw binary Xizage = namedtuple("Xizage", "hs ss os fs ls")
[docs] class Indexer: """ Indexer is fully qualified cryptographic material primitive base class for indexed primitives. In special cases some codes in the Index code table may be of variable length (i.e. not indexed) when the full size table entry is None. In that case the index is used instread as the length. Sub classes are derivation code and key event element context specific. Includes the following attributes and properties: Attributes: Properties: code is str of stable (hard) part of derivation code raw (bytes): unqualified crypto material usable for crypto operations index (int): main index offset into list or length of material ondex (int | None): other index offset into list or length of material qb64b (bytes): fully qualified Base64 crypto material qb64 (str | bytes): fully qualified Base64 crypto material qb2 (bytes): fully qualified binary crypto material Hidden: ._code (str): value for .code property ._raw (bytes): value for .raw property ._index (int): value for .index property ._ondex (int): value for .ondex property ._infil is method to compute fully qualified Base64 from .raw and .code ._binfil is method to compute fully qualified Base2 from .raw and .code ._exfil is method to extract .code and .raw from fully qualified Base64 ._bexfil is method to extract .code and .raw from fully qualified Base2 """ Codex = IdrDex # Hards table maps from bytes Base64 first code char to int of hard size, hs, # (stable) of code. The soft size, ss, (unstable) is always > 0 for Indexer. Hards = ({chr(c): 1 for c in range(65, 65 + 26)}) Hards.update({chr(c): 1 for c in range(97, 97 + 26)}) Hards.update([('0', 2), ('1', 2), ('2', 2), ('3', 2), ('4', 2)]) # Sizes table maps hs chars of code to Xizage namedtuple of (hs, ss, os, fs, ls) # where hs is hard size, ss is soft size, os is other index size, # and fs is full size, ls is lead size. # where ss includes os, so main index size ms = ss - os # soft size, ss, should always be > 0 for Indexer Sizes = { 'A': Xizage(hs=1, ss=1, os=0, fs=88, ls=0), 'B': Xizage(hs=1, ss=1, os=0, fs=88, ls=0), 'C': Xizage(hs=1, ss=1, os=0, fs=88, ls=0), 'D': Xizage(hs=1, ss=1, os=0, fs=88, ls=0), 'E': Xizage(hs=1, ss=1, os=0, fs=88, ls=0), 'F': Xizage(hs=1, ss=1, os=0, fs=88, ls=0), '0A': Xizage(hs=2, ss=2, os=1, fs=156, ls=0), '0B': Xizage(hs=2, ss=2, os=1, fs=156, ls=0), '2A': Xizage(hs=2, ss=4, os=2, fs=92, ls=0), '2B': Xizage(hs=2, ss=4, os=2, fs=92, ls=0), '2C': Xizage(hs=2, ss=4, os=2, fs=92, ls=0), '2D': Xizage(hs=2, ss=4, os=2, fs=92, ls=0), '2E': Xizage(hs=2, ss=4, os=2, fs=92, ls=0), '2F': Xizage(hs=2, ss=4, os=2, fs=92, ls=0), '3A': Xizage(hs=2, ss=6, os=3, fs=160, ls=0), '3B': Xizage(hs=2, ss=6, os=3, fs=160, ls=0), '0z': Xizage(hs=2, ss=2, os=0, fs=None, ls=0), '1z': Xizage(hs=2, ss=2, os=1, fs=76, ls=1), '4z': Xizage(hs=2, ss=6, os=3, fs=80, ls=1), } # Bards table maps to hard size, hs, of code from bytes holding sextets # converted from first code char. Used for ._bexfil. Bards = ({codeB64ToB2(c): hs for c, hs in Hards.items()})
[docs] def __init__(self, raw=None, code=IdrDex.Ed25519_Sig, index=0, ondex=None, qb64b=None, qb64=None, qb2=None, strip=False): """ Validate as fully qualified Parameters: raw (bytes): unqualified crypto material usable for crypto operations code is str of stable (hard) part of derivation code index (int): main index offset into list or length of material ondex (int | None): other index offset into list or length of material qb64b (bytes): fully qualified Base64 crypto material qb64 (str | bytes): fully qualified Base64 crypto material qb2 (bytes): fully qualified binary crypto material strip (bool): True means strip counter contents from input stream bytearray after parsing qb64b or qb2. False means do not strip Needs either (raw and code and index) or qb64b or qb64 or qb2 Otherwise raises EmptyMaterialError When raw and code provided then validate that code is correct for length of raw and assign .raw Else when qb64b or qb64 or qb2 provided extract and assign .raw, .code, .index, .ondex. """ if raw is not None: # raw provided if not code: raise EmptyMaterialError("Improper initialization need either " "(raw and code) or qb64b or qb64 or qb2.") if not isinstance(raw, (bytes, bytearray)): raise TypeError(f"Not a bytes or bytearray, raw={raw}.") if code not in self.Sizes: raise UnexpectedCodeError(f"Unsupported code={code}.") hs, ss, os, fs, ls = self.Sizes[code] # get sizes for code cs = hs + ss # both hard + soft code size ms = ss - os if not isinstance(index, int) or index < 0 or index > (64 ** ms - 1): raise InvalidVarIndexError(f"Invalid index={index} for code={code}.") if isinstance(ondex, int) and os and not (ondex >= 0 and ondex <= (64 ** os - 1)): raise InvalidVarIndexError(f"Invalid ondex={ondex} for code={code}.") if code in IdxCrtSigDex and ondex is not None: raise InvalidVarIndexError(f"Non None ondex={ondex} for code={code}.") if code in IdxBthSigDex: if ondex is None: # set default ondex = index # when not provided make ondex match index else: if ondex != index and os == 0: # must match if os == 0 raise InvalidVarIndexError(f"Non matching ondex={ondex}" f" and index={index} for " f"code={code}.") if not fs: # compute fs from index if cs % 4: raise InvalidCodeSizeError(f"Whole code size not multiple of 4 for " f"variable length material. cs={cs}.") if os != 0: raise InvalidCodeSizeError(f"Non-zero other index size for " f"variable length material. os={os}.") fs = (index * 4) + cs rawsize = (fs - cs) * 3 // 4 raw = raw[:rawsize] # copy rawsize from stream, may be less if len(raw) != rawsize: # forbids shorter raise RawMaterialError(f"Not enougth raw bytes for code={code}" f"and index={index} ,expected {rawsize} " f"got {len(raw)}.") self._code = code self._index = index self._ondex = ondex self._raw = bytes(raw) # crypto ops require bytes not bytearray elif qb64b is not None: self._exfil(qb64b) if strip: # assumes bytearray del qb64b[:len(self.qb64b)] # may be variable length fs elif qb64 is not None: self._exfil(qb64) elif qb2 is not None: self._bexfil(qb2) if strip: # assumes bytearray del qb2[:len(self.qb2)] # may be variable length fs else: raise EmptyMaterialError("Improper initialization need either " "(raw and code and index) or qb64b or " "qb64 or qb2.")
@classmethod def _rawSize(cls, code): """ Returns expected raw size in bytes for a given code. Not applicable to codes with fs = None """ hs, ss, os, fs, ls = cls.Sizes[code] # get sizes return ((fs - (hs + ss)) * 3 // 4) @property def code(self): """ Returns ._code Makes .code read only """ return self._code @property def raw(self): """ Returns ._raw Makes .raw read only """ return self._raw @property def index(self): """ Returns ._index Makes .index read only """ return self._index @property def ondex(self): """ Returns ._ondex Makes .ondex read only """ return self._ondex @property def qb64b(self): """ Property qb64b: Returns Fully Qualified Base64 Version encoded as bytes Assumes self.raw and self.code are correctly populated """ return self._infil() @property def qb64(self): """ Property qb64: Returns Fully Qualified Base64 Version Assumes self.raw and self.code are correctly populated """ return self.qb64b.decode("utf-8") @property def qb2(self): """ Property qb2: Returns Fully Qualified Binary Version Bytes """ return self._binfil() def _infil(self): """ Returns fully qualified attached sig base64 bytes computed from self.raw, self.code and self.index. cs = hs + ss os = ss - ms (main index size) when fs None then size computed & fs = size * 4 + cs """ code = self.code # codex value chars hard code index = self.index # main index value ondex = self.ondex # other index value raw = self.raw # bytes or bytearray ps = (3 - (len(raw) % 3)) % 3 # if lead then same pad size chars & lead size bytes hs, ss, os, fs, ls = self.Sizes[code] cs = hs + ss ms = ss - os if not fs: # compute fs from index if cs % 4: raise InvalidCodeSizeError(f"Whole code size not multiple of 4 for " f"variable length material. cs={cs}.") if os != 0: raise InvalidCodeSizeError(f"Non-zero other index size for " f"variable length material. os={os}.") fs = (index * 4) + cs if index < 0 or index > (64 ** ms - 1): raise InvalidVarIndexError(f"Invalid index={index} for code={code}.") if (isinstance(ondex, int) and os and not (ondex >= 0 and ondex <= (64 ** os - 1))): raise InvalidVarIndexError(f"Invalid ondex={ondex} for os={os} and " f"code={code}.") # both is hard code + converted index + converted ondex both = (f"{code}{intToB64(index, l=ms)}" f"{intToB64(ondex if ondex is not None else 0, l=os)}") # check valid pad size for whole code size, assumes ls is zero if len(both) != cs: raise InvalidCodeSizeError("Mismatch code size = {} with table = {}." .format(cs, len(both))) if (cs % 4) != ps - ls: # adjusted pad given lead bytes raise InvalidCodeSizeError(f"Invalid code={both} for converted" f" raw pad size={ps}.") # prepend pad bytes, convert, then replace pad chars with full derivation # code including index, full = both.encode("utf-8") + encodeB64(bytes([0] * ps) + raw)[ps - ls:] if len(full) != fs: # invalid size raise InvalidCodeSizeError(f"Invalid code={both} for raw size={len(raw)}.") return full def _binfil(self): """ Returns bytes of fully qualified base2 bytes, that is .qb2 self.code and self.index converted to Base2 + self.raw left shifted with pad bits equivalent of Base64 decode of .qb64 into .qb2 """ code = self.code # codex chars hard code index = self.index # main index value ondex = self.ondex # other index value raw = self.raw # bytes or bytearray ps = (3 - (len(raw) % 3)) % 3 # same pad size chars & lead size bytes hs, ss, os, fs, ls = self.Sizes[code] cs = hs + ss ms = ss - os if index < 0 or index > (64 ** ss - 1): raise InvalidVarIndexError(f"Invalid index={index} for code={code}.") if (isinstance(ondex, int) and os and not (ondex >= 0 and ondex <= (64 ** os - 1))): raise InvalidVarIndexError(f"Invalid ondex={ondex} for os={os} and " f"code={code}.") if not fs: # compute fs from index if cs % 4: raise InvalidCodeSizeError(f"Whole code size not multiple of 4 for " f"variable length material. cs={cs}.") if os != 0: raise InvalidCodeSizeError(f"Non-zero other index size for " f"variable length material. os={os}.") fs = (index * 4) + cs # both is hard code + converted index both = (f"{code}{intToB64(index, l=ms)}" f"{intToB64(ondex if ondex is not None else 0, l=os)}") if len(both) != cs: raise InvalidCodeSizeError("Mismatch code size = {} with table = {}." .format(cs, len(both))) if (cs % 4) != ps - ls: # adjusted pad given lead bytes raise InvalidCodeSizeError(f"Invalid code={both} for converted" f" raw pad size={ps}.") n = sceil(cs * 3 / 4) # number of b2 bytes to hold b64 code + index # convert code both to right align b2 int then left shift in pad bits # then convert to bytes bcode = (b64ToInt(both) << (2 * (ps - ls))).to_bytes(n, 'big') full = bcode + bytes([0] * ls) + raw bfs = len(full) # binary full size if bfs % 3 or (bfs * 4 // 3) != fs: # invalid size raise InvalidCodeSizeError(f"Invalid code={both} for raw size={len(raw)}.") return full def _exfil(self, qb64b): """ Extracts self.code, self.index, and self.raw from qualified base64 bytes qb64b cs = hs + ss ms = ss - os (main index size) when fs None then size computed & fs = size * 4 + cs """ if not qb64b: # empty need more bytes raise ShortageError("Empty material.") first = qb64b[:1] # extract first char code selector if hasattr(first, "decode"): first = first.decode("utf-8") if first not in self.Hards: if first[0] == '-': raise UnexpectedCountCodeError("Unexpected count code start" "while extracing Indexer.") elif first[0] == '_': raise UnexpectedOpCodeError("Unexpected op code start" "while extracing Indexer.") else: raise UnexpectedCodeError(f"Unsupported code start char={first}.") hs = self.Hards[first] # get hard code size if len(qb64b) < hs: # need more bytes raise ShortageError(f"Need {hs - len(qb64b)} more characters.") hard = qb64b[:hs] # get hard code if hasattr(hard, "decode"): hard = hard.decode("utf-8") if hard not in self.Sizes: raise UnexpectedCodeError(f"Unsupported code ={hard}.") hs, ss, os, fs, ls = self.Sizes[hard] # assumes hs in both tables consistent cs = hs + ss # both hard + soft code size ms = ss - os # assumes that unit tests on Indexer and IndexerCodex ensure that # .Codes and .Sizes are well formed. # hs consistent and hs > 0 and ss > 0 and (fs >= hs + ss if fs is not None else True) # assumes no variable length indexed codes so fs is not None if len(qb64b) < cs: # need more bytes raise ShortageError(f"Need {cs - len(qb64b)} more characters.") index = qb64b[hs:hs+ms] # extract index/size chars if hasattr(index, "decode"): index = index.decode("utf-8") index = b64ToInt(index) # compute int index ondex = qb64b[hs+ms:hs+ms+os] # extract ondex chars if hasattr(ondex, "decode"): ondex = ondex.decode("utf-8") if hard in IdxCrtSigDex: # if current sig then ondex from code must be 0 ondex = b64ToInt(ondex) if os else None # compute ondex from code if ondex: # not zero or None so error raise ValueError(f"Invalid ondex={ondex} for code={hard}.") else: ondex = None # zero so set to None when current only else: ondex = b64ToInt(ondex) if os else index # index is index for some codes and variable length for others if not fs: # compute fs from index which means variable length if cs % 4: raise ValidationError(f"Whole code size not multiple of 4 for " f"variable length material. cs={cs}.") if os != 0: raise ValidationError(f"Non-zero other index size for " f"variable length material. os={os}.") fs = (index * 4) + cs if len(qb64b) < fs: # need more bytes raise ShortageError(f"Need {fs - len(qb64b)} more chars.") qb64b = qb64b[:fs] # fully qualified primitive code plus material if hasattr(qb64b, "encode"): # only convert extracted chars from stream qb64b = qb64b.encode("utf-8") # strip off prepended code and append pad characters #ps = cs % 4 # pad size ps = cs mod 4, same pad chars and lead bytes #base = ps * b'A' + qb64b[cs:] # replace prepend code with prepad zeros #raw = decodeB64(base)[ps+ls:] # decode and strip off ps+ls prepad bytes # check for non-zeroed pad bits or lead bytes ps = cs % 4 # code pad size ps = cs mod 4 pbs = 2 * (ps if ps else ls) # pad bit size in bits if ps: # ps. IF ps THEN not ls (lead) and vice versa OR not ps and not ls base = ps * b'A' + qb64b[cs:] # replace pre code with prepad chars of zero paw = decodeB64(base) # decode base to leave prepadded raw pi = (int.from_bytes(paw[:ps], "big")) # prepad as int if pi & (2 ** pbs - 1 ): # masked pad bits non-zero raise ValueError(f"Non zeroed prepad bits = " f"{pi & (2 ** pbs - 1 ):<06b} in {qb64b[cs:cs+1]}.") raw = paw[ps:] # strip off ps prepad paw bytes else: # not ps. IF not ps THEN may or may not be ls (lead) base = qb64b[cs:] # strip off code leaving lead chars if any and value # decode lead chars + val leaving lead bytes + raw bytes # then strip off ls lead bytes leaving raw paw = decodeB64(base) # decode base to leave prepadded paw bytes li = int.from_bytes(paw[:ls], "big") # lead as int if li: # pre pad lead bytes must be zero if ls == 1: raise ValueError(f"Non zeroed lead byte = 0x{li:02x}.") else: raise ValueError(f"Non zeroed lead bytes = 0x{li:04x}.") raw = paw[ls:] if len(raw) != (len(qb64b) - cs) * 3 // 4: # exact lengths raise ConversionError(f"Improperly qualified material = {qb64b}") self._code = hard self._index = index self._ondex = ondex self._raw = raw # must be bytes for crpto opts and immutable not bytearray def _bexfil(self, qb2): """ Extracts self.code, self.index, and self.raw from qualified base2 bytes qb2 cs = hs + ss ms = ss - os (main index size) when fs None then size computed & fs = size * 4 + cs """ if not qb2: # empty need more bytes raise ShortageError("Empty material, Need more bytes.") first = nabSextets(qb2, 1) # extract first sextet as code selector if first not in self.Bards: if first[0] == b'\xf8': # b64ToB2('-') raise UnexpectedCountCodeError("Unexpected count code start" "while extracing Matter.") elif first[0] == b'\xfc': # b64ToB2('_') raise UnexpectedOpCodeError("Unexpected op code start" "while extracing Matter.") else: raise UnexpectedCodeError(f"Unsupported code start sextet={first}.") hs = self.Bards[first] # get code hard size equvalent sextets bhs = sceil(hs * 3 / 4) # bhs is min bytes to hold hs sextets if len(qb2) < bhs: # need more bytes raise ShortageError(f"Need {bhs - len(qb2)} more bytes.") hard = codeB2ToB64(qb2, hs) # extract and convert hard part of code if hard not in self.Sizes: raise UnexpectedCodeError(f"Unsupported code ={hard}.") hs, ss, os, fs, ls = self.Sizes[hard] cs = hs + ss # both hs and ss ms = ss - os # assumes that unit tests on Indexer and IndexerCodex ensure that # .Codes and .Sizes are well formed. # hs consistent and hs > 0 and ss > 0 and (fs >= hs + ss if fs is not None else True) bcs = sceil(cs * 3 / 4) # bcs is min bytes to hold cs sextets if len(qb2) < bcs: # need more bytes raise ShortageError("Need {} more bytes.".format(bcs - len(qb2))) both = codeB2ToB64(qb2, cs) # extract and convert both hard and soft part of code index = b64ToInt(both[hs:hs+ms]) # compute index if hard in IdxCrtSigDex: # if current sig then ondex from code must be 0 ondex = b64ToInt(both[hs+ms:hs+ms+os]) if os else None # compute ondex from code if ondex: # not zero or None so error raise ValueError(f"Invalid ondex={ondex} for code={hard}.") else: ondex = None # zero so set to None when current only else: ondex = b64ToInt(both[hs+ms:hs+ms+os]) if os else index if hard in IdxCrtSigDex: # if current sig then ondex from code must be 0 if ondex: # not zero so error raise ValueError(f"Invalid ondex={ondex} for code={hard}.") else: # zero so set to None ondex = None if not fs: # compute fs from size chars in ss part of code if cs % 4: raise ValidationError(f"Whole code size not multiple of 4 for " f"variable length material. cs={cs}.") if os != 0: raise ValidationError(f"Non-zero other index size for " f"variable length material. os={os}.") fs = (index * 4) + cs bfs = sceil(fs * 3 / 4) # bfs is min bytes to hold fs sextets if len(qb2) < bfs: # need more bytes raise ShortageError("Need {} more bytes.".format(bfs - len(qb2))) qb2 = qb2[:bfs] # extract qb2 fully qualified primitive code plus material # check for non-zeroed prepad bits or lead bytes ps = cs % 4 # code pad size ps = cs mod 4 pbs = 2 * (ps if ps else ls) # pad bit size in bits if ps: # ps. IF ps THEN not ls (lead) and vice versa OR not ps and not ls # convert last byte of code bytes in which are pad bits to int pi = (int.from_bytes(qb2[bcs-1:bcs], "big")) if pi & (2 ** pbs - 1 ): # masked pad bits non-zero raise ValueError(f"Non zeroed pad bits = " f"{pi & (2 ** pbs - 1 ):>08b} in 0x{pi:02x}.") else: # not ps. IF not ps THEN may or may not be ls (lead) li = int.from_bytes(qb2[bcs:bcs+ls], "big") # lead as int if li: # pre pad lead bytes must be zero if ls == 1: raise ValueError(f"Non zeroed lead byte = 0x{li:02x}.") else: raise ValueError(f"Non zeroed lead bytes = 0x{li:02x}.") raw = qb2[(bcs + ls):] # strip code and leader bytes from qb2 to get raw if len(raw) != (len(qb2) - bcs - ls): # exact lengths raise ConversionError(f"Improperly qualified material = {qb2}") self._code = hard self._index = index self._ondex = ondex self._raw = bytes(raw) # must be bytes for crypto ops and not bytearray mutable
[docs] class Siger(Indexer): """ Siger is subclass of Indexer, indexed signature material, Adds .verfer property which is instance of Verfer that provides associated signature verifier. See Indexer for inherited attributes and properties: Attributes: Properties: verfer (Verfer): instance if any provides public verification key Methods: Hidden: _verfer (Verfer): value for .verfer property """
[docs] def __init__(self, verfer=None, **kwa): """Initialze instance Parameters: See Matter for inherted parameters verfer (Verfer): instance if any provides public verification key """ super(Siger, self).__init__(**kwa) if self.code not in IdxSigDex: raise ValidationError("Invalid code = {} for Siger." "".format(self.code)) self.verfer = verfer
@property def verfer(self): """ Property verfer: Returns Verfer instance Assumes ._verfer is correctly assigned """ return self._verfer @verfer.setter def verfer(self, verfer): """ verfer property setter """ self._verfer = verfer
[docs] @dataclass(frozen=True) class CounterCodex: """ CounterCodex is codex hard (stable) part of all counter derivation codes. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ ControllerIdxSigs: str = '-A' # Qualified Base64 Indexed Signature. WitnessIdxSigs: str = '-B' # Qualified Base64 Indexed Signature. NonTransReceiptCouples: str = '-C' # Composed Base64 Couple, pre+cig. TransReceiptQuadruples: str = '-D' # Composed Base64 Quadruple, pre+snu+dig+sig. FirstSeenReplayCouples: str = '-E' # Composed Base64 Couple, fnu+dts. TransIdxSigGroups: str = '-F' # Composed Base64 Group, pre+snu+dig+ControllerIdxSigs group. SealSourceCouples: str = '-G' # Composed Base64 couple, snu+dig of given delegators or issuers event TransLastIdxSigGroups: str = '-H' # Composed Base64 Group, pre+ControllerIdxSigs group. SealSourceTriples: str = '-I' # Composed Base64 triple, pre+snu+dig of anchoring source event SadPathSig: str = '-J' # Composed Base64 Group path+TransIdxSigGroup of SAID of content SadPathSigGroup: str = '-K' # Composed Base64 Group, root(path)+SaidPathCouples PathedMaterialQuadlets: str = '-L' # Composed Grouped Pathed Material Quadlet (4 char each) AttachedMaterialQuadlets: str = '-V' # Composed Grouped Attached Material Quadlet (4 char each) BigAttachedMaterialQuadlets: str = '-0V' # Composed Grouped Attached Material Quadlet (4 char each) KERIProtocolStack: str = '--AAA' # KERI ACDC Protocol Stack CESR Version def __iter__(self): return iter(astuple(self)) # enables inclusion test with "in"
CtrDex = CounterCodex()
[docs] @dataclass(frozen=True) class ProtocolGenusCodex: """ProtocolGenusCodex is codex of protocol genera for code table. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ KERI: str = '--AAA' # KERI and ACDC Protocol Stacks share the same tables ACDC: str = '--AAA' # KERI and ACDC Protocol Stacks share the same tables def __iter__(self): return iter(astuple(self)) # enables inclusion test with "in"
# duplicate values above just result in multiple entries in tuple so # in inclusion still works ProDex = ProtocolGenusCodex() # Make instance
[docs] @dataclass(frozen=True) class AltCounterCodex: """ CounterCodex is codex hard (stable) part of all counter derivation codes. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ ControllerIdxSigs: str = '-A' # Qualified Base64 Indexed Signature. WitnessIdxSigs: str = '-B' # Qualified Base64 Indexed Signature. NonTransReceiptCouples: str = '-C' # Composed Base64 Couple, pre+cig. TransReceiptQuadruples: str = '-D' # Composed Base64 Quadruple, pre+snu+dig+sig. FirstSeenReplayCouples: str = '-E' # Composed Base64 Couple, fnu+dts. TransIdxSigGroups: str = '-F' # Composed Base64 Group, pre+snu+dig+ControllerIdxSigs group. SealSourceCouples: str = '-G' # Composed Base64 couple, snu+dig of given delegators or issuers event TransLastIdxSigGroups: str = '-H' # Composed Base64 Group, pre+ControllerIdxSigs group. SealSourceTriples: str = '-I' # Composed Base64 triple, pre+snu+dig of anchoring source event SadPathSig: str = '-J' # Composed Base64 Group path+TransIdxSigGroup of SAID of content SadPathSigGroup: str = '-K' # Composed Base64 Group, root(path)+SaidPathCouples PathedMaterialQuadlets: str = '-L' # Composed Grouped Pathed Material Quadlet (4 char each) MessageDataGroups: str = '-U' # Composed Message Data Group or Primitive AttachedMaterialQuadlets: str = '-V' # Composed Grouped Attached Material Quadlet (4 char each) MessageDataMaterialQuadlets: str = '-W' # Composed Grouped Message Data Quadlet (4 char each) CombinedMaterialQuadlets: str = '-X' # Combined Message Data + Attachments Quadlet (4 char each) MaterialGroups: str = '-Y' # Composed Generic Material Group or Primitive MaterialQuadlets: str = '-Z' # Composed Generic Material Quadlet (4 char each) BigMessageDataGroups: str = '-0U' # Composed Message Data Group or Primitive BigAttachedMaterialQuadlets: str = '-0V' # Composed Grouped Attached Material Quadlet (4 char each) BigMessageDataMaterialQuadlets: str = '-0W' # Composed Grouped Message Data Quadlet (4 char each) BigCombinedMaterialQuadlets: str = '-0X' # Combined Message Data + Attachments Quadlet (4 char each) BigMaterialGroups: str = '-0Y' # Composed Generic Material Group or Primitive BigMaterialQuadlets: str = '-0Z' # Composed Generic Material Quadlet (4 char each) def __iter__(self): return iter(astuple(self)) # enables inclusion test with "in"
[docs] class Counter: """ Counter is fully qualified cryptographic material primitive base class for counter primitives (framing composition grouping count codes). Sub classes are derivation code and key event element context specific. Includes the following attributes and properties: Attributes: Properties: .code is str derivation code to indicate cypher suite .raw is bytes crypto material only without code .pad is int number of pad chars given raw .count is int count of grouped following material (not part of counter) .qb64 is str in Base64 fully qualified with derivation code + crypto mat .qb64b is bytes in Base64 fully qualified with derivation code + crypto mat .qb2 is bytes in binary with derivation code + crypto material Hidden: ._code is str value for .code property ._raw is bytes value for .raw property ._pad is method to compute .pad property ._count is int value for .count property ._infil is method to compute fully qualified Base64 from .raw and .code ._exfil is method to extract .code and .raw from fully qualified Base64 """ Codex = CtrDex # Hards table maps from bytes Base64 first two code chars to int of # hard size, hs,(stable) of code. The soft size, ss, (unstable) for Counter # is always > 0 and hs + ss = fs always Hards = ({('-' + chr(c)): 2 for c in range(65, 65 + 26)}) Hards.update({('-' + chr(c)): 2 for c in range(97, 97 + 26)}) Hards.update([('-0', 3)]) Hards.update([('--', 5)]) # Sizes table maps hs chars of code to Sizage namedtuple of (hs, ss, fs) # where hs is hard size, ss is soft size, and fs is full size # soft size, ss, should always be > 0 and hs+ss=fs for Counter Sizes = { '-A': Sizage(hs=2, ss=2, fs=4, ls=0), '-B': Sizage(hs=2, ss=2, fs=4, ls=0), '-C': Sizage(hs=2, ss=2, fs=4, ls=0), '-D': Sizage(hs=2, ss=2, fs=4, ls=0), '-E': Sizage(hs=2, ss=2, fs=4, ls=0), '-F': Sizage(hs=2, ss=2, fs=4, ls=0), '-G': Sizage(hs=2, ss=2, fs=4, ls=0), '-H': Sizage(hs=2, ss=2, fs=4, ls=0), '-I': Sizage(hs=2, ss=2, fs=4, ls=0), '-J': Sizage(hs=2, ss=2, fs=4, ls=0), '-K': Sizage(hs=2, ss=2, fs=4, ls=0), '-L': Sizage(hs=2, ss=2, fs=4, ls=0), '-V': Sizage(hs=2, ss=2, fs=4, ls=0), '-0V': Sizage(hs=3, ss=5, fs=8, ls=0), '--AAA': Sizage(hs=5, ss=3, fs=8, ls=0), } # Bards table maps to hard size, hs, of code from bytes holding sextets # converted from first two code char. Used for ._bexfil. Bards = ({codeB64ToB2(c): hs for c, hs in Hards.items()})
[docs] def __init__(self, code=None, count=None, countB64=None, qb64b=None, qb64=None, qb2=None, strip=False): """ Validate as fully qualified Parameters: code (str | None): stable (hard) part of derivation code count (int | None): count for composition. Count may represent quadlets/triplet, groups, primitives or other numericy When both count and countB64 are None then count defaults to 1 countB64 (str | None): count for composition as Base64 countB64 may represent quadlets/triplet, groups, primitives or other numericy qb64b (bytes | bytearray | None): fully qualified crypto material text domain qb64 (str | None) fully qualified crypto material text domain qb2 (bytes | bytearray | None) fully qualified crypto material binary domain strip (bool): True means strip counter contents from input stream bytearray after parsing qb64b or qb2. False means do not strip. default False Needs either code or qb64b or qb64 or qb2 Otherwise raises EmptyMaterialError When code and count provided then validate that code and count are correct Else when qb64b or qb64 or qb2 provided extract and assign .code and .count """ if code is not None: # code provided if code not in self.Sizes: raise InvalidCodeError("Unsupported code={}.".format(code)) hs, ss, fs, ls = self.Sizes[code] # get sizes for code cs = hs + ss # both hard + soft code size if fs != cs or cs % 4: # fs must be bs and multiple of 4 for count codes raise InvalidCodeSizeError("Whole code size not full size or not " "multiple of 4. cs={} fs={}.".format(cs, fs)) if count is None: count = 1 if countB64 is None else b64ToInt(countB64) if count < 0 or count > (64 ** ss - 1): raise InvalidVarIndexError("Invalid count={} for code={}.".format(count, code)) self._code = code self._count = count elif qb64b is not None: self._exfil(qb64b) if strip: # assumes bytearray del qb64b[:self.Sizes[self.code].fs] elif qb64 is not None: self._exfil(qb64) elif qb2 is not None: # rewrite to use direct binary exfiltration self._bexfil(qb2) if strip: # assumes bytearray del qb2[:self.Sizes[self.code].fs * 3 // 4] else: raise EmptyMaterialError("Improper initialization need either " "(code and count) or qb64b or " "qb64 or qb2.")
@property def code(self): """ Returns ._code Makes .code read only """ return self._code @property def count(self): """ Returns ._count Makes ._count read only """ return self._count @property def qb64b(self): """ Property qb64b: Returns Fully Qualified Base64 Version encoded as bytes Assumes self.raw and self.code are correctly populated """ return self._infil() @property def qb64(self): """ Property qb64: Returns Fully Qualified Base64 Version Assumes self.raw and self.code are correctly populated """ return self.qb64b.decode("utf-8") @property def qb2(self): """ Property qb2: Returns Fully Qualified Binary Version Bytes """ return self._binfil()
[docs] def countToB64(self, l=None): """ Returns count as Base64 left padded with "A"s Parameters: l (int | None): minimum number characters including left padding When not provided use the softsize of .code """ if l is None: _, ss, _, _ = self.Sizes[self.code] l = ss return (intToB64(self.count, l=l))
[docs] @staticmethod def semVerToB64(version="", major=0, minor=0, patch=0): """ Converts semantic version to Base64 representation of countB64 suitable for CESR protocol genus and version Returns: countB64 (str): suitable for input to Counter example: Counter(countB64=semVerToB64(version = "1.0.0")) Parameters: version (str | None): dot separated semantic version string of format "major.minor.patch" major (int): When version is None or empty then use major,minor, patch minor (int): When version is None or empty then use major,minor, patch patch (int): When version is None or empty then use major,minor, patch each of major, minor, patch must be in range [0,63] for represenation as three Base64 characters """ parts = [major, minor, patch] if version: splits = version.split(".", maxsplit=3) splits = [(int(s) if s else 0) for s in splits] for i in range(3-len(splits),0, -1): splits.append(parts[-i]) parts = splits for p in parts: if p < 0 or p > 63: raise ValueError(f"Out of bounds semantic version. " f"Part={p} is < 0 or > 63.") return ("".join(intToB64(p, l=1) for p in parts))
def _infil(self): """ Returns fully qualified attached sig base64 bytes computed from self.code and self.count. """ code = self.code # codex value chars hard code count = self.count # index value int used for soft hs, ss, fs, ls = self.Sizes[code] cs = hs + ss # both hard + soft size if fs != cs or cs % 4: # fs must be bs and multiple of 4 for count codes raise InvalidCodeSizeError("Whole code size not full size or not " "multiple of 4. cs={} fs={}.".format(cs, fs)) if count < 0 or count > (64 ** ss - 1): raise InvalidVarIndexError("Invalid count={} for code={}.".format(count, code)) # both is hard code + converted count both = "{}{}".format(code, intToB64(count, l=ss)) # check valid pad size for whole code size if len(both) % 4: # no pad raise InvalidCodeSizeError("Invalid size = {} of {} not a multiple of 4." .format(len(both), both)) # prepending full derivation code with index and strip off trailing pad characters return (both.encode("utf-8")) def _binfil(self): """ Returns bytes of fully qualified base2 bytes, that is .qb2 self.code converted to Base2 left shifted with pad bits equivalent of Base64 decode of .qb64 into .qb2 """ code = self.code # codex chars hard code count = self.count # index value int used for soft hs, ss, fs, ls = self.Sizes[code] cs = hs + ss if fs != cs or cs % 4: # fs must be cs and multiple of 4 for count codes raise InvalidCodeSizeError("Whole code size not full size or not " "multiple of 4. cs={} fs={}.".format(cs, fs)) if count < 0 or count > (64 ** ss - 1): raise InvalidVarIndexError("Invalid count={} for code={}.".format(count, code)) # both is hard code + converted count both = "{}{}".format(code, intToB64(count, l=ss)) if len(both) != cs: raise InvalidCodeSizeError("Mismatch code size = {} with table = {}." .format(cs, len(both))) return (codeB64ToB2(both)) # convert to b2 left shift if any def _exfil(self, qb64b): """ Extracts self.code and self.count from qualified base64 bytes qb64b """ if not qb64b: # empty need more bytes raise ShortageError("Empty material, Need more characters.") first = qb64b[:2] # extract first two char code selector if hasattr(first, "decode"): first = first.decode("utf-8") if first not in self.Hards: if first[0] == '_': raise UnexpectedOpCodeError("Unexpected op code start" "while extracing Counter.") else: raise UnexpectedCodeError("Unsupported code start ={}.".format(first)) hs = self.Hards[first] # get hard code size if len(qb64b) < hs: # need more bytes raise ShortageError("Need {} more characters.".format(hs - len(qb64b))) hard = qb64b[:hs] # get hard code if hasattr(hard, "decode"): hard = hard.decode("utf-8") # decode converts bytearray/bytes to str if hard not in self.Sizes: # Sizes needs str not bytes raise UnexpectedCodeError("Unsupported code ={}.".format(hard)) hs, ss, fs, ls = self.Sizes[hard] # assumes hs consistent in both tables cs = hs + ss # both hard + soft code size # assumes that unit tests on Counter and CounterCodex ensure that # .Codes and .Sizes are well formed. # hs consistent and hs > 0 and ss > 0 and fs = hs + ss and not fs % 4 if len(qb64b) < cs: # need more bytes raise ShortageError("Need {} more characters.".format(cs - len(qb64b))) count = qb64b[hs:hs + ss] # extract count chars if hasattr(count, "decode"): count = count.decode("utf-8") count = b64ToInt(count) # compute int count self._code = hard self._count = count def _bexfil(self, qb2): """ Extracts self.code and self.count from qualified base2 bytes qb2 """ if not qb2: # empty need more bytes raise ShortageError("Empty material, Need more bytes.") first = nabSextets(qb2, 2) # extract first two sextets as code selector if first not in self.Bards: if first[0] == b'\xfc': # b64ToB2('_') raise UnexpectedOpCodeError("Unexpected op code start" "while extracing Matter.") else: raise UnexpectedCodeError("Unsupported code start sextet={}.".format(first)) hs = self.Bards[first] # get code hard size equvalent sextets bhs = sceil(hs * 3 / 4) # bhs is min bytes to hold hs sextets if len(qb2) < bhs: # need more bytes raise ShortageError("Need {} more bytes.".format(bhs - len(qb2))) hard = codeB2ToB64(qb2, hs) # extract and convert hard part of code if hard not in self.Sizes: raise UnexpectedCodeError("Unsupported code ={}.".format(hard)) hs, ss, fs, ls = self.Sizes[hard] cs = hs + ss # both hs and ss # assumes that unit tests on Counter and CounterCodex ensure that # .Codes and .Sizes are well formed. # hs consistent and hs > 0 and ss > 0 and fs = hs + ss and not fs % 4 bcs = sceil(cs * 3 / 4) # bcs is min bytes to hold cs sextets if len(qb2) < bcs: # need more bytes raise ShortageError("Need {} more bytes.".format(bcs - len(qb2))) both = codeB2ToB64(qb2, cs) # extract and convert both hard and soft part of code count = b64ToInt(both[hs:hs + ss]) # get count self._code = hard self._count = count
[docs] class Sadder: """ Sadder is self addressed data (SAD) serializer-deserializer class Instance creation of a Sadder does not verifiy it .said property it merely extracts it. In order to ensure Sadder instance has a verified .said then must call .saider.verify(sad=self.ked) Has the following public properties: Properties: raw (bytes): of serialized event only ked (dict): self addressed data dict kind (str): serialization kind coring.Serials such as JSON, CBOR, MGPK, CESR size (int): number of bytes in serialization version (Versionage): protocol version (Major, Minor) proto (str): Protocolage value as protocol identifier such as KERI, ACDC label (str): Saidage value as said field label saider (Saider): of SAID of this SAD .ked['d'] if present said (str): SAID of .saider qb64 saidb (bytes): SAID of .saider qb64b pretty (str): Pretty JSON of this SAD Hidden Attributes: ._raw is bytes of serialized event only ._ked is key event dict ._kind is serialization kind string value (see namedtuple coring.Serials) supported kinds are 'json', 'cbor', 'msgpack', 'binary' ._size is int of number of bytes in serialed event only ._version is Versionage instance of event version ._proto (str): Protocolage value as protocol type identifier ._saider (Saider): instance for this Sadder's SAID Note: loads and jumps of json use str whereas cbor and msgpack use bytes """
[docs] def __init__(self, raw=b'', ked=None, sad=None, kind=None, saidify=False, code=MtrDex.Blake3_256): """ Deserialize if raw provided does not verify assumes embedded said is valid Serialize if ked provided but not raw verifies if verify is True? When serializing if kind provided then use kind instead of field in ked Parameters: raw (bytes): serialized event ked is key event dict or None if None its deserialized from raw kind is serialization kind string value or None (see namedtuple coring.Serials) supported kinds are 'json', 'cbor', 'msgpack', 'binary' if kind is None then its extracted from ked or raw saidify (bool): True means compute said for ked code is .diger default digest code for computing said .saider """ self._code = code # need default code for .saider if raw: # deserialize raw using property setter self.raw = raw # raw property setter does the deserialization elif ked: # serialize ked using property setter #ToDo when pass in ked and saidify True then compute said self._kind = kind self.ked = ked # ked property setter does the serialization elif sad: # ToDo do we need this or should we be using ked above with saidify flag self._clone(sad=sad) # copy fields from sad else: raise ValueError("Improper initialization need sad, raw or ked.")
def _clone(self, sad): """ copy hidden attributes from sad """ self._raw = sad.raw self._ked = sad.ked self._kind = sad.kind self._size = sad.size self._version = sad.version self._proto = sad.proto self._saider = sad.saider def _inhale(self, raw): """ Parses serilized event ser of serialization kind and assigns to instance attributes. Parameters: raw is bytes of serialized event kind is str of raw serialization kind (see namedtuple Serials) size is int size of raw to be deserialized Note: loads and jumps of json use str whereas cbor and msgpack use bytes """ proto, kind, version, size = sniff(raw) if version != Version: raise VersionError("Unsupported version = {}.{}, expected {}." "".format(version.major, version.minor, Version)) if len(raw) < size: raise ShortageError("Need more bytes.") ked = loads(raw=raw, size=size, kind=kind) return ked, proto, kind, version, size def _exhale(self, ked, kind=None): """ Returns sizeify(ked, kind) From sizeify Returns tuple of (raw, proto, kind, ked, version) where: raw (str): serialized event as bytes of kind proto (str): protocol type as value of Protocolage kind (str): serialzation kind as value of Serialage ked (dict): key event dict or sad dict version (Versionage): instance Parameters: ked (dict): key event dict or sad dict kind (str): value of Serials serialization kind. When not provided use Assumes only supports Version """ return sizeify(ked=ked, kind=kind)
[docs] def compare(self, said=None): """ Returns True if said and either .saider.qb64 or .saider.qb64b match via string equality == Convenience method to allow comparison of own .saider digest self.raw with some other purported said of self.raw Parameters: said is qb64b or qb64 SAID of ser to compare with .said """ if said is not None: if hasattr(said, "encode"): said = said.encode('utf-8') # makes bytes return said == self.saidb # matching else: raise ValueError("Both said and saider may not be None.")
@property def raw(self): """ raw property getter """ return self._raw @raw.setter def raw(self, raw): """ raw property setter """ ked, proto, kind, version, size = self._inhale(raw=raw) self._raw = bytes(raw[:size]) # crypto ops require bytes not bytearray self._ked = ked self._proto = proto self._kind = kind self._version = version self._size = size self._saider = Saider(qb64=ked["d"], code=self._code) @property def ked(self): """ ked property getter""" return self._ked @ked.setter def ked(self, ked): """ ked property setter assumes ._kind """ raw, proto, kind, ked, version = self._exhale(ked=ked, kind=self._kind) size = len(raw) self._raw = raw[:size] self._ked = ked self._proto = proto self._kind = kind self._size = size self._version = version self._saider = Saider(qb64=ked["d"], code=self._code) @property def kind(self): """ kind property getter""" return self._kind @kind.setter def kind(self, kind): """ kind property setter Assumes ._ked. Serialization kind. """ raw, proto, kind, ked, version = self._exhale(ked=self._ked, kind=kind) size = len(raw) self._raw = raw[:size] self._proto = proto self._ked = ked self._kind = kind self._size = size self._version = version self._saider = Saider(qb64=ked["d"], code=self._code) @property def size(self): """ size property getter""" return self._size @property def version(self): """ version property getter Returns: (Versionage): """ return self._version @property def proto(self): """ proto property getter protocol identifier type value of Protocolage such as 'KERI' or 'ACDC' Returns: (str): Protocolage value as protocol type """ return self._proto @property def saider(self): """ Returns Diger of digest of self.raw diger (digest material) property getter """ return self._saider @property def said(self): """ Returns str qb64 of .ked["d"] (said when ked is SAD) said (self-addressing identifier) property getter """ return self.saider.qb64 @property def saidb(self): """ Returns bytes qb64b of .ked["d"] (said when ked is SAD) said (self-addressing identifier) property getter """ return self.saider.qb64b
[docs] def pretty(self, *, size=1024): """ Returns str JSON of .ked with pretty formatting ToDo: add default size limit on pretty when used for syslog UDP MCU like 1024 for ogler.logger """ return json.dumps(self.ked, indent=1)[:size if size is not None else None]
[docs] class Tholder: """ Tholder is KERI Signing Threshold Satisfaction class .satisfy method evaluates satisfaction based on ordered list of indices of verified signatures where indices correspond to offsets in key list of associated signatures. Has the following public properties: Properties: .weighted is Boolean True if fractional weighted threshold False if numeric .size is int of minimum size of keys list when weighted is size of keys list when unweighted is size of int thold since don't have anyway to know size of keys list in this case .limen is qualified b64 signing threshold suitable for CESR serialization. either Number.qb64b or Bexter.qb64b. The b64 portion of limen with code stripped (Bexter.bext) of [["1/2", "1/2", "1/4", "1/4", "1/4"], ["1", "1"]] is '1s2c1s2c1s4c1s4c1s4a1c1' basically slash is 's', comma is 'c', and ANDed clauses are delimited by 'a'. .sith is original signing threshold suitable for value to be serialized as json, cbor, mgpk in key event message as either: non-negative hex number str or list of str rational number fractions >= 0 and <= 1 or list of list of str rational number fractions >= 0 and <= 1 .thold is parsed signing threshold suitable for calculating satisfaction. either as int or list of Fractions .num is int signing threshold when not ._weighted Methods: .satisfy returns bool, True means ilist of verified signature key indices satisfies threshold, False otherwise. Static Methods: weight (str): converts weight str expression into either int or Fraction else raises ValueError must satisfy 0 <= w <= 1 Ensures strict proper rational number fraction of ints or 0 or 1 Hidden: ._weighted is Boolean, True if fractional weighted threshold False if numeric ._size is int minimum size of of keys list ._sith is signing threshold for .sith property ._thold is signing threshold for .thold propery ._bexter is Bexter instance of weighted signing threshold or None ._number is Number instance of integer threshold or None ._satisfy is method reference of threshold specified verification method ._satisfy_numeric is numeric threshold verification method ._satisfy_weighted is fractional weighted threshold verification method """
[docs] def __init__(self, *, thold=None , limen=None, sith=None, **kwa): """ Accepts signing threshold in various forms so that may output correct forms for serialization and/or calculation of satisfaction. Parameters: sith is signing threshold (current or next) expressed as either: non-negative int of threshold number (M-of-N threshold) next threshold may be zero non-negative hex string of threshold number (M-of-N threshold) next threshold may be zero fractional weight clauses which may be expressed as either: an iterable of rational number fraction strings >= 0 and <= 1 an iterable of iterables of rational number fraction strings >= 0 and <= 1 JSON serialized str of either: list of rational number fraction strings >= 0 and <= 1 or list of list of rational number fraction strings >= 0 and <= 1 limen is qualified signing threshold (current or next) expressed as either: Number.qb64 or .qb64b of integer threshold or Bexter.qb64 or .qb64b of fractional weight clauses which may be either: Base64 delimited clauses of fractions Base64 delimited clauses of fractions thold is signing threshold (current or next) is suitable for computing the satisfaction of a threshold and is expressed as either: int of threshold number (M of N) fractional weight clauses which may be expressed as either: an iterable of Fractions or an iterable of iterables of Fractions. The sith representation is meant to parse threhold expressions from deserializations of JSON, CBOR, or MGPK key event message fields or the command line or configuration files. The limen representation is meant to parse threshold expressions from CESR serializations of key event message fields or attachments. The thold representation is meant to accept thresholds from computable expressions for satisfaction of a threshold """ if thold is not None: self._processThold(thold=thold) elif limen is not None: self._processLimen(limen=limen, **kwa) # kwa for strip elif sith is not None: if isinstance(sith, str) and not sith: # empty str raise EmptyMaterialError("Empty threshold expression.") self._processSith(sith=sith) else: raise EmptyMaterialError("Missing threshold expression.")
@property def weighted(self): """ weighted property getter """ return self._weighted @property def thold(self): """ thold property getter """ return self._thold @property def size(self): """ size property getter """ return self._size @property def limen(self): """ limen property getter """ return self._bexter.qb64b if self._weighted else self._number.qb64b @property def sith(self): """ sith property getter """ # make sith expression of thold if self.weighted: sith = [[f"{f.numerator}/{f.denominator}" if (0 < f < 1) else f"{int(f)}" for f in clause] for clause in self.thold] if len(sith) == 1: sith = sith[0] # simplify list of one clause to clause else: sith = f"{self.thold:x}" return sith @property def json(self): """Returns json serialization of sith expression Essentially JSON list of lists of strings """ return json.dumps(self.sith) @property def num(self): """ sith property getter """ return self.thold if not self._weighted else None def _processThold(self, thold: int | Iterable): """Process thold input Parameters: thold (int | Iterable): computable thold expression """ if isinstance(thold, int): self._processUnweighted(thold=thold) else: self._processWeighted(thold=thold) def _processLimen(self, limen: str | bytes, **kwa): """Process limen input Parameters: limen (str): CESR encoded qb64 threshold (weighted or unweighted) """ matter = Matter(qb64b=limen, **kwa) # kwa for strip of stream if matter.code in NumDex: number = Number(raw=matter.raw, code=matter.code, **kwa) self._processUnweighted(thold=number.num) elif matter.code in BexDex: # Convert to fractional thold expression bexter = Bexter(raw=matter.raw, code=matter.code, **kwa) t = bexter.bext.replace('s', '/') # get clauses thold = [clause.split('c') for clause in t.split('a')] thold = [[self.weight(w) for w in clause] for clause in thold] self._processWeighted(thold=thold) else: raise InvalidCodeError(f"Invalid code for limen = {matter.code}.") def _processSith(self, sith: int | str | Iterable): """ Process attributes for fractionall weighted threshold sith Parameters: sith is signing threshold (current or next) expressed as either: non-negative int of threshold number (M-of-N threshold) next threshold may be zero non-negative hex string of threshold number (M-of-N threshold) next threshold may be zero fractional weight clauses which may be expressed as either: an iterable of rational number fraction weight str or int str each denoted w where 0 <= w <= 1 an iterable of iterables of rational number fraction weight or int str each denoted w where 0 <= w <= 1>= 0 JSON serialized str of either: list of rational number fraction weight strings each denoted w where 0 <= w <= 1 list of lists of rational number fraction weight strings each denoted w where 0 <= w <= 1 when any w is 0 or 1 then representation is 0 or 1 not 0/1 or 1/1 """ if isinstance(sith, int): self._processUnweighted(thold=sith) elif isinstance(sith, str) and '[' not in sith: self._processUnweighted(thold=int(sith, 16)) else: # assumes iterable of weights or iterable of iterables of weights if isinstance(sith, str): # json of weighted sith from cli sith = json.loads(sith) # deserialize if not sith: # empty iterable raise ValueError(f"Empty weight list = {sith}.") # because all([]) == True have to also test for emply mask # is it non str iterable of non str iterable of strs mask = [nonStringIterable(c) for c in sith] if mask and not all(mask): # not empty and not iterable of iterables sith = [sith] # attempt to make Iterable of Iterables for c in sith: # get each clause mask = [isinstance(w, str) for w in c] # must be all strs if mask and not all(mask): # not empty and not iterable of strs? raise ValueError(f"Invalid sith = {sith} some weights in" f"clause {c} are non string.") # replace weight str expression, int str or fractional strings with # int or fraction as appropriate. thold = [] for clause in sith: # convert string fractions to Fractions # append list of weights converted fromnn str expression thold.append([self.weight(w) for w in clause]) self._processWeighted(thold=thold) def _processUnweighted(self, thold=0): """ Process attributes for unweighted (numeric) threshold thold Parameters: thold (int): non-negative threshold number M-of-N threshold """ if thold < 0: raise ValueError(f"Non-positive int threshold = {thold}.") self._thold = thold self._weighted = False self._size = self._thold # used to verify that keys list size is at least size self._satisfy = self._satisfy_numeric self._number = Number(num=thold) self._bexter = None def _processWeighted(self, thold=[]): """ Process attributes for fractionall weighted threshold thold Parameters: thold (iterable): iterable or iterable or iterables of rational number fraction strings >= 0 and <= 1 """ for clause in thold: # sum of fractions in clause must be >= 1 if not (sum(clause) >= 1): raise ValueError(f"Invalid sith clause = {thold}, all " f"clause weight sums must be >= 1.") self._thold = thold self._weighted = True self._size = sum(len(clause) for clause in thold) self._satisfy = self._satisfy_weighted # make bext str of thold for .bexter for limen bext = [[f"{f.numerator}s{f.denominator}" if (0 < f < 1) else f"{int(f)}" for f in clause] for clause in thold] bext = "a".join(["c".join(clause) for clause in bext]) self._number = None self._bexter = Bexter(bext=bext) @staticmethod def _oldcheckWeight(w: Fraction) -> Fraction: """Returns w if 0 <= w <= 1 Else raises ValueError Parameters: w (Fraction): Threshold weight Fraction """ if not 0 <= w <= 1: raise ValueError(f"Invalid weight not 0 <= {w} <= 1.") return w
[docs] @staticmethod def weight(w: str) -> Fraction: """Returns valid weight from w else raises error (ValueError or TypeError). w expression must evaluate to 0, 1, or strict proper rational fraction. w expression must be 0 <= w <= 1 Else raises ValueError w must not be float else raises TypeError When not int w must be ratio of integers n/d else raise ValueError. Parameters: w (str): threshold weight expression """ try: # float str or ratio str raises ValueError if int(float(w)) != float(w): # float str raise TypeError("Invalid weight str got float w={w}.") w = int(w) # expression is int str except TypeError as ex: raise ValueError(str(ex)) from ex except ValueError as ex: # not float str or int str so try ration str w = Fraction(w) if not 0 <= w <= 1: raise ValueError(f"Invalid weight not 0 <= {w} <= 1.") return w
[docs] def satisfy(self, indices): """ Returns True if indices list of verified signature key indices satisfies threshold, False otherwise. Parameters: indices is list of non-negative indices (offsets into key list) of verified signatures. the indices may be in any order, they are normalized herein """ return (self._satisfy(indices=indices))
def _satisfy_numeric(self, indices): """ Returns True if satisfies numeric threshold False otherwise Parameters: indices is list of indices (offsets into key list) of verified signatures """ try: if self.thold > 0 and len(indices) >= self.thold: # at least one return True except Exception as ex: return False return False def _satisfy_weighted(self, indices): """ Returns True if satifies fractional weighted threshold False otherwise Parameters: indices is list of non-negative indices (offsets into key list) of verified signatures. the indices may be in any order, they are normalized herein """ try: if not indices: # empty indices return False # remove duplicates with set, sort low to high indices = sorted(set(indices)) sats = [False] * self.size # default all satifactions to False for idx in indices: sats[idx] = True # set verified signature index to True wio = 0 # weight index offset for clause in self.thold: cw = 0 # init clause weight for w in clause: if sats[wio]: # verified signature so weight applies cw += w wio += 1 if cw < 1: # each clause must sum to at least 1 return False return True # all clauses including final one cw >= 1 except Exception as ex: return False return False
[docs] class Dicter: """ Dicter class is base class for objects that can be stored in a Suber Dicter classes can be initialized by a dict and then expose bytes of JSON in the .raw property Subclasses can add semantically appropriate properties that extract / add specific keys to the underlying dict .pad """
[docs] def __init__(self, raw=b'', pad=None, sad=None, label=Saids.i): """ Create Dicter from either pad dict or raw bytes Parameters: raw(bytes): raw JSON of dicter class pad(dict) data dict for class: label (str): field name of the SAID field. """ self._label = label if raw: # deserialize raw using property setter self.raw = raw # raw property setter does the deserialization elif pad: # serialize ked using property setter self.pad = pad # pad property setter does the serialization elif sad: self._clone(sad=sad) else: raise ValueError("Improper initialization need sad, raw or ked.")
def _clone(self, sad): self._raw = sad.raw self._pad = sad.pad self._rid = sad.rid @property def raw(self): """ raw property getter """ return self._raw @raw.setter def raw(self, raw): """ raw property setter """ self._raw = raw self._pad = json.loads(self._raw.decode("utf-8")) if self._label not in self._pad or self._pad[self._label] == "": self._pad[self._label] = randomNonce() self._rid = self._pad[self._label] @property def pad(self): """ pad property getter""" return self._pad @pad.setter def pad(self, pad): """ pad property setter """ self._pad = pad if self._label not in self._pad or self._pad[self._label] == "": self._pad[self._label] = randomNonce() self._raw = json.dumps(self._pad).encode("utf-8") self._rid = self._pad[self._label] @property def rid(self): """ ID of dict data as str """ return self._rid
[docs] def pretty(self, *, size=1024): """ Returns str JSON of .ked with pretty formatting ToDo: add default size limit on pretty when used for syslog UDP MCU like 1024 for ogler.logger """ return json.dumps(self.pad, indent=1)[:size if size is not None else None]
[docs] def randomNonce(): """ Generate a random ed25519 seed and encode as qb64 Returns: str: qb64 encoded ed25519 random seed """ preseed = pysodium.randombytes(pysodium.crypto_sign_SEEDBYTES) seedqb64 = Matter(raw=preseed, code=MtrDex.Ed25519_Seed).qb64 return seedqb64