# -*- encoding: utf-8 -*-
"""
keri.core.scheming module
self-addressing and schema support
"""
import json
import cbor2 as cbor
import jsonschema
import msgpack
from . import coring
from .coring import MtrDex, Serials, Saider, Saids
from .. import help, kering
from ..kering import ValidationError, DeserializeError
logger = help.ogler.getLogger()
[docs]
class CacheResolver:
""" Sample jsonschema resolver for loading schema $ref references from a local hash.
"""
[docs]
def __init__(self, db):
""" Create a jsonschema resolver that can be used for loading references to schema remotely.
Parameters:
db (Baser) is a database instance to store and retrieve json schema SADs
"""
self.db = db
[docs]
def add(self, key, schema):
""" Add schema to cache for resolution
Parameters:
key (str): URI to resolve to the schema
schema (bytes): is bytes of the schema for the URI
"""
schemer = Schemer(raw=schema)
if schemer.said != key:
return
self.db.schema.pin(key, schemer)
def resolve(self, uri):
schemer = self.db.schema.get(uri)
if schemer is None:
return None
return schemer.raw
[docs]
def handler(self, uri):
""" Handler provided to jsonschema for cache resolution
Parameters:
uri (str): the URI to resolve
"""
try:
idx = uri.rindex(":")
key = uri[idx+1:]
except ValueError:
key = uri
schemer = self.db.schema.get(key)
if not schemer:
return None
return schemer.sed
[docs]
def resolver(self, scer=b''):
""" Locally cached schema resolver
Returns a jsonschema resolver for returning locally cached schema based on self-addressing
identifier URIs.
Parameters:
scer (Optional(bytes)) is the source document that is being processed for reference resolution
"""
return jsonschema.RefResolver("", scer, handlers={"did": self.handler})
[docs]
class JSONSchema:
""" JSON Schema support class
"""
id_ = Saids.dollar # ID Field Label
[docs]
def __init__(self, resolver=None):
""" Initialize instance
Parameters:
resolver(Optional(Resolver)): instance used by JSONSchema parsing to resolve external refs
"""
self.resolver = resolver
[docs]
def resolve(self, uri):
""" Resolve remote reference to schema
Parameters:
uri (str): uniform resource identifier of schema to load
"""
if self.resolver is None:
return None
return self.resolver.resolve(uri)
[docs]
def load(self, raw, kind=Serials.json):
""" Schema loader
Loads schema based on kind by performing deserialization on raw bytes of schema
Parameters:
raw (bytes): raw serialized schema
kind (Optional(Serials)): serialization kind of schema raw content
Returns:
tuple: (dict, Serials, Saider) of schema
"""
if kind == Serials.json:
try:
sed = json.loads(raw.decode("utf-8"))
except Exception as ex:
raise DeserializeError("Error deserializing JSON: {} {}"
"".format(raw.decode("utf-8"), ex))
elif kind == Serials.mgpk:
try:
sed = msgpack.loads(raw)
except Exception as ex:
raise DeserializeError("Error deserializing MGPK: {} {}"
"".format(raw, ex))
elif kind == Serials.cbor:
try:
sed = cbor.loads(raw)
except Exception as ex:
raise DeserializeError("Error deserializing CBOR: {} {}"
"".format(raw, ex))
else:
raise ValueError("Invalid serialization kind = {}".format(kind))
if self.id_ in sed:
saider = Saider(qb64=sed[self.id_], label=self.id_)
said = sed[self.id_]
if not saider.verify(sed, prefixed=True, kind=kind, label=self.id_):
raise ValidationError("invalid self-addressing identifier {} instead of {} in schema = {}"
"".format(said, saider.qb64, sed))
else:
raise ValidationError("missing ID field {} in schema = {}"
"".format(self.id_, sed))
return sed, kind, saider
[docs]
@staticmethod
def dump(sed, kind=Serials.json):
""" Serailize schema based on kind
Parameters:
sed (dict): in memory representation of schema
kind (Optional(Serials)): kind of serialization to perform. Defaults to JSON
Returns:
bytes: Serialized schema
"""
raw = coring.dumps(sed, kind)
return raw
[docs]
@staticmethod
def detect(raw):
""" Detect if raw content is JSON Schema
Parameters:
raw (bytes): data to check for JSON Schema
Returns:
boolean: True if content represents JSON Schema by checking
for $schema; False otherwise
"""
try:
raw.index(b'"$schema"')
except ValueError:
return False
return True
[docs]
@staticmethod
def verify_schema(schema):
""" Validate schema integrity
Returns True if the provided schema validates successfully
as complaint Draft 7 JSON Schema False otherwise
Parameters:
schema (dict): is the JSON schema to verify
"""
try:
jsonschema.Draft7Validator.check_schema(schema=schema)
except jsonschema.exceptions.SchemaError:
return False
return True
[docs]
def verify_json(self, schema=b'', raw=b''):
""" Verify the raw content against the schema for JSON that conforms to the schema
Parameters:
schema (bytes): is the schema use for validation
raw (bytes): is JSON to validate against the Schema
Returns:
boolean: True if the JSON passes validation against the
provided complaint Draft 7 JSON Schema. Returns False
if raw is not valid JSON, schema is not valid JSON Schema or
the validation fails
"""
try:
d = json.loads(raw)
kwargs = dict()
if self.resolver is not None:
kwargs["resolver"] = self.resolver.resolver(scer=raw)
jsonschema.validate(instance=d, schema=schema, **kwargs)
except jsonschema.exceptions.ValidationError as ex:
raise kering.ValidationError(f'Credential validation exception: {ex}')
except jsonschema.exceptions.SchemaError as ex:
raise kering.ValidationError(f'Schema exception: {ex}')
except json.decoder.JSONDecodeError as ex:
raise kering.ValidationError(f"Credential JSON exception: {ex}")
except Exception as ex:
raise kering.ValidationError(f"Credential Exception: {ex}")
return True
[docs]
class Schemer:
""" Schemer is KERI schema serializer-deserializer class
Verifies self-addressing identifier base on schema type
Only supports current version VERSION
Has the following public properties:
Properties:
.raw (bytes): of serialized event only
.sed (dict): schema dict
.kind (Schema): kind string value (see namedtuple coring.Serials)
.saider (Saider): instance of self-addressing identifier
.said (qb64): digest from .saider
Hidden Attributes:
._raw (bytes): of serialized schema only
._sed (JSON): schema dict
._kind (schema): kind string value (see namedtuple coring.Serials)
supported kinds are 'JSONSchema'
._code (default): code for .saider
._saider (Saider): instance of digest of .raw
"""
[docs]
def __init__(self, raw=b'', sed=None, kind=None, typ=JSONSchema(), code=MtrDex.Blake3_256):
""" Initialize instance of Schemer
Deserialize if raw provided
Serialize if sed provided but not raw
When serializing if kind provided then use kind instead of field in sed
Parameters:
raw (bytes): of serialized schema
sed (dict): dict or None
if None its deserialized from raw
typ (JSONSchema): type of schema
kind (serialization): kind string value or None (see namedtuple coring.Serials)
supported kinds are 'json', 'cbor', 'msgpack', 'binary'
if kind (None): then its extracted from ked or raw
code (MtrDex): default digest code
"""
self._code = code
if raw:
self.raw = raw
elif sed:
self.typ = typ
self._kind = kind
self.sed = sed
else:
raise ValueError("Improper initialization need raw or sed.")
if not self._verify_schema():
raise ValidationError("invalid kind {} for schema {}"
"".format(self.kind, self.sed))
def _inhale(self, raw):
"""
Loads type specific Schema ked and verifies the self-addressing identifier
of the raw content
Parameters:
raw: JSON to load
"""
self.typ = self._sniff(raw)
sed, kind, saider = self.typ.load(raw=raw)
return sed, kind, saider
def _exhale(self, sed, kind=None):
""" Dumps type specific Schema JSON and returns the raw bytes, sed and schema kind
Parameters:
sed: (dict): JSON to load
kind (Schema) tuple of schema type
"""
saider = Saider(sad=sed, code=self._code, label=self.typ.id_)
sed[self.typ.id_] = saider.qb64
raw = self.typ.dump(sed)
return raw, sed, kind, saider
@staticmethod
def _sniff(raw):
""" Determine type of schema from raw bytes
Parameters:
raw (bytes): serialized schema
"""
try:
raw.index(b'"$schema"')
except ValueError:
pass
else:
return JSONSchema()
# Default for now is JSONSchema because we don't support any other
return JSONSchema()
@property
def raw(self):
""" raw property getter """
return self._raw
@raw.setter
def raw(self, raw):
""" raw property setter """
sed, kind, saider = self._inhale(raw=raw)
self._raw = bytes(raw) # crypto ops require bytes not bytearray
self._sed = sed
self._kind = kind
self._saider = saider
@property
def sed(self):
""" ked property getter"""
return self._sed
@sed.setter
def sed(self, sed):
""" ked property setter assumes ._kind """
raw, sed, kind, saider = self._exhale(sed=sed, kind=self._kind)
self._raw = raw
self._kind = kind
self._sed = sed
self._saider = saider
@property
def kind(self):
""" kind property getter """
return self._kind
@kind.setter
def kind(self, kind):
""" kind property setter Assumes ._ked """
raw, kind, sed, saider = self._exhale(sed=self._sed, kind=kind)
self._raw = raw
self._sed = sed
self._kind = kind
self._saider = Saider(raw=self._raw, code=self._code, label=Saids.dollar)
@property
def saider(self):
""" saider property getter """
return self._saider
@property
def said(self):
""" said property getter, relies on saider """
return self.saider.qb64
[docs]
def verify(self, raw=b''):
"""
Returns True if derivation from ked for .code matches .qb64 and
If prefixed also verifies ked["i"] matches .qb64
False otherwise
Parameters:
raw (bytes): is serialised JSON content to verify against schema
"""
return self.typ.verify_json(schema=self.sed, raw=raw)
[docs]
def pretty(self, *, size=1024):
"""
Returns str JSON of .sed with pretty formatting
ToDo: add default size limit on pretty when used for syslog UDP MCU
like 1024 for ogler.logger
"""
return json.dumps(self.sed, indent=1)[:size if size is not None else None]
def _verify_schema(self):
"""
Returns True if derivation from ked for .code matches .qb64 and
If prefixed also verifies ked["i"] matches .qb64
False otherwise
"""
return self.typ.verify_schema(schema=self.sed)