# -*- encoding: utf-8 -*-
"""
keri.core.scheming module
self-addressing and schema support
"""
import json
import cbor2 as cbor
import jsonschema
import msgpack
import referencing
import referencing.exceptions
import referencing.jsonschema
from hio.help import ogler
from .coring import MtrDex, Kinds, Saider, Saids, dumps
from ..kering import ValidationError, DeserializeError
logger = ogler.getLogger()
[docs]
class CacheResolver:
""" Sample jsonschema resolver for loading schema $ref references from a local hash.
"""
[docs]
def __init__(self, db):
""" Create a jsonschema resolver that can be used for loading references to schema remotely.
Parameters:
db (Baser) is a database instance to store and retrieve json schema SADs
"""
self.db = db
[docs]
def add(self, key, schema):
""" Add schema to cache for resolution
Parameters:
key (str): URI to resolve to the schema
schema (bytes): is bytes of the schema for the URI
"""
schemer = Schemer(raw=schema)
if schemer.said != key:
return
self.db.schema.pin(key, schemer)
def resolve(self, uri):
schemer = self.db.schema.get(uri)
if schemer is None:
return None
return schemer.raw
[docs]
def handler(self, uri):
""" Handler provided to jsonschema for cache resolution
Parameters:
uri (str): the URI to resolve
"""
try:
idx = uri.rindex(":")
key = uri[idx+1:]
except ValueError:
key = uri
schemer = self.db.schema.get(key)
if not schemer:
return None
return schemer.sed
[docs]
def resolver(self, scer=b''):
""" Locally cached schema resolver
Returns a referencing.Registry for returning locally cached schema based on self-addressing
identifier URIs.
Parameters:
scer (Optional(bytes)) is the source document that is being processed for reference resolution
"""
def retrieve(uri):
try:
idx = uri.rindex(":")
key = uri[idx + 1:]
except ValueError:
key = uri
schemer = self.db.schema.get(key)
if not schemer:
raise referencing.exceptions.NoSuchResource(ref=uri)
return referencing.jsonschema.DRAFT7.create_resource(schemer.sed)
return referencing.Registry(retrieve=retrieve)
[docs]
class JSONSchema:
""" JSON Schema support class
"""
id_ = Saids.dollar # ID Field Label
[docs]
def __init__(self, resolver=None):
""" Initialize instance
Parameters:
resolver(Optional(Resolver)): instance used by JSONSchema parsing to resolve external refs
"""
self.resolver = resolver
[docs]
def resolve(self, uri):
""" Resolve remote reference to schema
Parameters:
uri (str): uniform resource identifier of schema to load
"""
if self.resolver is None:
return None
return self.resolver.resolve(uri)
[docs]
def load(self, raw, kind=Kinds.json):
""" Schema loader
Loads schema based on kind by performing deserialization on raw bytes of schema
Parameters:
raw (bytes): raw serialized schema
kind (Optional(Serials)): serialization kind of schema raw content
Returns:
tuple: (dict, Serials, Saider) of schema
"""
if kind == Kinds.json:
try:
sed = json.loads(raw.decode("utf-8"))
except Exception as ex:
raise DeserializeError("Error deserializing JSON: {} {}"
"".format(raw.decode("utf-8"), ex))
elif kind == Kinds.mgpk:
try:
sed = msgpack.loads(raw)
except Exception as ex:
raise DeserializeError("Error deserializing MGPK: {} {}"
"".format(raw, ex))
elif kind == Kinds.cbor:
try:
sed = cbor.loads(raw)
except Exception as ex:
raise DeserializeError("Error deserializing CBOR: {} {}"
"".format(raw, ex))
else:
raise ValueError("Invalid serialization kind = {}".format(kind))
if self.id_ in sed:
saider = Saider(qb64=sed[self.id_], label=self.id_)
said = sed[self.id_]
if not saider.verify(sed, prefixed=True, kind=kind, label=self.id_):
raise ValidationError("invalid self-addressing identifier {} instead of {} in schema = {}"
"".format(said, saider.qb64, sed))
else:
raise ValidationError("missing ID field {} in schema = {}"
"".format(self.id_, sed))
return sed, kind, saider
[docs]
@staticmethod
def dump(sed, kind=Kinds.json):
""" Serailize schema based on kind
Parameters:
sed (dict): in memory representation of schema
kind (Optional(Serials)): kind of serialization to perform. Defaults to JSON
Returns:
bytes: Serialized schema
"""
raw = dumps(sed, kind)
return raw
[docs]
@staticmethod
def detect(raw):
""" Detect if raw content is JSON Schema
Parameters:
raw (bytes): data to check for JSON Schema
Returns:
boolean: True if content represents JSON Schema by checking
for $schema; False otherwise
"""
try:
raw.index(b'"$schema"')
except ValueError:
return False
return True
[docs]
@staticmethod
def verify_schema(schema):
""" Validate schema integrity
Returns True if the provided schema validates successfully
as complaint Draft 7 JSON Schema False otherwise
Parameters:
schema (dict): is the JSON schema to verify
"""
try:
jsonschema.Draft7Validator.check_schema(schema=schema)
except jsonschema.exceptions.SchemaError:
return False
return True
[docs]
def verify_json(self, schema=b'', raw=b''):
""" Verify the raw content against the schema for JSON that conforms to the schema
Parameters:
schema (bytes): is the schema use for validation
raw (bytes): is JSON to validate against the Schema
Returns:
boolean: True if the JSON passes validation against the
provided complaint Draft 7 JSON Schema. Returns False
if raw is not valid JSON, schema is not valid JSON Schema or
the validation fails
"""
try:
d = json.loads(raw)
kwargs = dict()
if self.resolver is not None:
kwargs["registry"] = self.resolver.resolver(scer=raw)
jsonschema.validate(instance=d, schema=schema, **kwargs)
except jsonschema.exceptions.ValidationError as ex:
raise ValidationError(f'Credential validation exception: {ex}')
except jsonschema.exceptions.SchemaError as ex:
raise ValidationError(f'Schema exception: {ex}')
except json.decoder.JSONDecodeError as ex:
raise ValidationError(f"Credential JSON exception: {ex}")
except Exception as ex:
raise ValidationError(f"Credential Exception: {ex}")
return True
[docs]
class Schemer:
""" Schemer is KERI schema serializer-deserializer class
Verifies self-addressing identifier base on schema type
Only supports current version VERSION
Has the following public properties:
Properties:
.raw (bytes): of serialized event only
.sed (dict): schema dict
.kind (Schema): kind string value (see namedtuple coring.Serials)
.saider (Saider): instance of self-addressing identifier
.said (qb64): digest from .saider
Hidden Attributes:
._raw (bytes): of serialized schema only
._sed (JSON): schema dict
._kind (schema): kind string value (see namedtuple coring.Serials), supported kinds are 'JSONSchema'
._code (default): code for .saider
._saider (Saider): instance of digest of .raw
"""
[docs]
def __init__(self, raw=b'', sed=None, kind=None, typ=JSONSchema(),
code=MtrDex.Blake3_256, verify=True):
""" Initialize instance of Schemer
Deserialize if raw provided
Serialize if sed provided but not raw
When serializing if kind provided then use kind instead of field in sed
Parameters:
raw (bytes): of serialized schema
sed (dict): dict or None
if None its deserialized from raw
typ (JSONSchema): type of schema
kind (serialization): kind string value or None (see namedtuple coring.Serials)
supported kinds are 'json', 'cbor', 'msgpack', 'binary'
if kind (None): then its extracted from ked or raw
code (MtrDex): default digest code
verify (bool): True means verify said(s) of given raw or sad.
Raises ValidationError if verification fails
False means don't verify. Useful to avoid unnecessary
reverification when deserializing from database
as opposed to over the wire reception.
"""
self._code = code
if raw:
self.raw = raw
elif sed:
self.typ = typ
self._kind = kind
self.sed = sed
else:
raise ValueError("Improper initialization need raw or sed.")
if verify and not self._verify_schema():
raise ValidationError("invalid kind {} for schema {}"
"".format(self.kind, self.sed))
def _inhale(self, raw):
"""
Loads type specific Schema ked and verifies the self-addressing identifier
of the raw content
Parameters:
raw: JSON to load
"""
self.typ = self._sniff(raw)
sed, kind, saider = self.typ.load(raw=raw)
return sed, kind, saider
def _exhale(self, sed, kind=None):
""" Dumps type specific Schema JSON and returns the raw bytes, sed and schema kind
Parameters:
sed: (dict): JSON to load
kind (Schema) tuple of schema type
"""
saider = Saider(sad=sed, code=self._code, label=self.typ.id_)
sed[self.typ.id_] = saider.qb64
raw = self.typ.dump(sed)
return raw, sed, kind, saider
@staticmethod
def _sniff(raw):
""" Determine type of schema from raw bytes
Parameters:
raw (bytes): serialized schema
"""
try:
raw.index(b'"$schema"')
except ValueError:
pass
else:
return JSONSchema()
# Default for now is JSONSchema because we don't support any other
return JSONSchema()
@property
def raw(self):
""" raw property getter """
return self._raw
@raw.setter
def raw(self, raw):
""" raw property setter """
sed, kind, saider = self._inhale(raw=raw)
self._raw = bytes(raw) # crypto ops require bytes not bytearray
self._sed = sed
self._kind = kind
self._saider = saider
@property
def sed(self):
""" ked property getter"""
return self._sed
@sed.setter
def sed(self, sed):
""" ked property setter assumes ._kind """
raw, sed, kind, saider = self._exhale(sed=sed, kind=self._kind)
self._raw = raw
self._kind = kind
self._sed = sed
self._saider = saider
@property
def kind(self):
""" kind property getter """
return self._kind
@kind.setter
def kind(self, kind):
""" kind property setter Assumes ._ked """
raw, kind, sed, saider = self._exhale(sed=self._sed, kind=kind)
self._raw = raw
self._sed = sed
self._kind = kind
self._saider = Saider(raw=self._raw, code=self._code, label=Saids.dollar)
@property
def saider(self):
""" saider property getter """
return self._saider
@property
def said(self):
""" said property getter, relies on saider """
return self.saider.qb64
[docs]
def verify(self, raw=b''):
"""
Returns True if derivation from ked for .code matches .qb64 and
If prefixed also verifies ked["i"] matches .qb64
False otherwise
Parameters:
raw (bytes): is serialised JSON content to verify against schema
"""
return self.typ.verify_json(schema=self.sed, raw=raw)
[docs]
def pretty(self, *, size=1024):
"""
Returns str JSON of .sed with pretty formatting
ToDo: add default size limit on pretty when used for syslog UDP MCU
like 1024 for ogler.logger
"""
return json.dumps(self.sed, indent=1)[:size if size is not None else None]
def _verify_schema(self):
"""
Returns True if derivation from ked for .code matches .qb64 and
If prefixed also verifies ked["i"] matches .qb64
False otherwise
"""
return self.typ.verify_schema(schema=self.sed)