1675 lines
63 KiB
Python
1675 lines
63 KiB
Python
from __future__ import absolute_import
|
|
|
|
from ast import literal_eval
|
|
from copy import copy
|
|
from datetime import date, datetime
|
|
import re
|
|
from warnings import warn
|
|
|
|
from cerberus import errors
|
|
from cerberus.platform import (
|
|
_int_types,
|
|
_str_type,
|
|
Container,
|
|
Hashable,
|
|
Iterable,
|
|
Mapping,
|
|
Sequence,
|
|
Sized,
|
|
)
|
|
from cerberus.schema import (
|
|
schema_registry,
|
|
rules_set_registry,
|
|
DefinitionSchema,
|
|
SchemaError,
|
|
)
|
|
from cerberus.utils import drop_item_from_tuple, readonly_classproperty, TypeDefinition
|
|
|
|
toy_error_handler = errors.ToyErrorHandler()
|
|
|
|
|
|
def dummy_for_rule_validation(rule_constraints):
|
|
def dummy(self, constraint, field, value):
|
|
raise RuntimeError(
|
|
'Dummy method called. Its purpose is to hold just'
|
|
'validation constraints for a rule in its '
|
|
'docstring.'
|
|
)
|
|
|
|
f = dummy
|
|
f.__doc__ = rule_constraints
|
|
return f
|
|
|
|
|
|
class DocumentError(Exception):
|
|
"""Raised when the target document is missing or has the wrong format"""
|
|
|
|
pass
|
|
|
|
|
|
class _SchemaRuleTypeError(Exception):
|
|
"""
|
|
Raised when a schema (list) validation encounters a mapping.
|
|
Not supposed to be used outside this module.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
class BareValidator(object):
|
|
"""
|
|
Validator class. Normalizes and/or validates any mapping against a
|
|
validation-schema which is provided as an argument at class instantiation
|
|
or upon calling the :meth:`~cerberus.Validator.validate`,
|
|
:meth:`~cerberus.Validator.validated` or
|
|
:meth:`~cerberus.Validator.normalized` method. An instance itself is
|
|
callable and executes a validation.
|
|
|
|
All instantiation parameters are optional.
|
|
|
|
There are the introspective properties :attr:`types`, :attr:`validators`,
|
|
:attr:`coercers`, :attr:`default_setters`, :attr:`rules`,
|
|
:attr:`normalization_rules` and :attr:`validation_rules`.
|
|
|
|
The attributes reflecting the available rules are assembled considering
|
|
constraints that are defined in the docstrings of rules' methods and is
|
|
effectively used as validation schema for :attr:`schema`.
|
|
|
|
:param schema: See :attr:`~cerberus.Validator.schema`.
|
|
Defaults to :obj:`None`.
|
|
:type schema: any :term:`mapping`
|
|
:param ignore_none_values: See :attr:`~cerberus.Validator.ignore_none_values`.
|
|
Defaults to ``False``.
|
|
:type ignore_none_values: :class:`bool`
|
|
:param allow_unknown: See :attr:`~cerberus.Validator.allow_unknown`.
|
|
Defaults to ``False``.
|
|
:type allow_unknown: :class:`bool` or any :term:`mapping`
|
|
:param require_all: See :attr:`~cerberus.Validator.require_all`.
|
|
Defaults to ``False``.
|
|
:type require_all: :class:`bool`
|
|
:param purge_unknown: See :attr:`~cerberus.Validator.purge_unknown`.
|
|
Defaults to to ``False``.
|
|
:type purge_unknown: :class:`bool`
|
|
:param purge_readonly: Removes all fields that are defined as ``readonly`` in the
|
|
normalization phase.
|
|
:type purge_readonly: :class:`bool`
|
|
:param error_handler: The error handler that formats the result of
|
|
:attr:`~cerberus.Validator.errors`.
|
|
When given as two-value tuple with an error-handler
|
|
class and a dictionary, the latter is passed to the
|
|
initialization of the error handler.
|
|
Default: :class:`~cerberus.errors.BasicErrorHandler`.
|
|
:type error_handler: class or instance based on
|
|
:class:`~cerberus.errors.BaseErrorHandler` or
|
|
:class:`tuple`
|
|
""" # noqa: E501
|
|
|
|
mandatory_validations = ('nullable',)
|
|
"""
|
|
Rules that are evaluated on any field, regardless whether defined in the schema or
|
|
not.
|
|
Type: :class:`tuple`
|
|
"""
|
|
priority_validations = ('nullable', 'readonly', 'type', 'empty')
|
|
"""
|
|
Rules that will be processed in that order before any other.
|
|
Type: :class:`tuple`
|
|
"""
|
|
types_mapping = {
|
|
'binary': TypeDefinition('binary', (bytes, bytearray), ()),
|
|
'boolean': TypeDefinition('boolean', (bool,), ()),
|
|
'container': TypeDefinition('container', (Container,), (_str_type,)),
|
|
'date': TypeDefinition('date', (date,), ()),
|
|
'datetime': TypeDefinition('datetime', (datetime,), ()),
|
|
'dict': TypeDefinition('dict', (Mapping,), ()),
|
|
'float': TypeDefinition('float', (float, _int_types), ()),
|
|
'integer': TypeDefinition('integer', (_int_types,), ()),
|
|
'list': TypeDefinition('list', (Sequence,), (_str_type,)),
|
|
'number': TypeDefinition('number', (_int_types, float), (bool,)),
|
|
'set': TypeDefinition('set', (set,), ()),
|
|
'string': TypeDefinition('string', (_str_type,), ()),
|
|
}
|
|
"""
|
|
This mapping holds all available constraints for the type rule and their assigned
|
|
:class:`~cerberus.TypeDefinition`.
|
|
"""
|
|
_valid_schemas = set()
|
|
"""
|
|
A :class:`set` of hashes derived from validation schemas that are legit for a
|
|
particular ``Validator`` class.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""
|
|
The arguments will be treated as with this signature:
|
|
|
|
__init__(self, schema=None, ignore_none_values=False,
|
|
allow_unknown=False, require_all=False,
|
|
purge_unknown=False, purge_readonly=False,
|
|
error_handler=errors.BasicErrorHandler)
|
|
"""
|
|
|
|
self.document = None
|
|
""" The document that is or was recently processed.
|
|
Type: any :term:`mapping` """
|
|
self._errors = errors.ErrorList()
|
|
""" The list of errors that were encountered since the last document
|
|
processing was invoked.
|
|
Type: :class:`~cerberus.errors.ErrorList` """
|
|
self.recent_error = None
|
|
""" The last individual error that was submitted.
|
|
Type: :class:`~cerberus.errors.ValidationError` """
|
|
self.document_error_tree = errors.DocumentErrorTree()
|
|
""" A tree representiation of encountered errors following the
|
|
structure of the document.
|
|
Type: :class:`~cerberus.errors.DocumentErrorTree` """
|
|
self.schema_error_tree = errors.SchemaErrorTree()
|
|
""" A tree representiation of encountered errors following the
|
|
structure of the schema.
|
|
Type: :class:`~cerberus.errors.SchemaErrorTree` """
|
|
self.document_path = ()
|
|
""" The path within the document to the current sub-document.
|
|
Type: :class:`tuple` """
|
|
self.schema_path = ()
|
|
""" The path within the schema to the current sub-schema.
|
|
Type: :class:`tuple` """
|
|
self.update = False
|
|
self.error_handler = self.__init_error_handler(kwargs)
|
|
""" The error handler used to format :attr:`~cerberus.Validator.errors`
|
|
and process submitted errors with
|
|
:meth:`~cerberus.Validator._error`.
|
|
Type: :class:`~cerberus.errors.BaseErrorHandler` """
|
|
self.__store_config(args, kwargs)
|
|
self.schema = kwargs.get('schema', None)
|
|
self.allow_unknown = kwargs.get('allow_unknown', False)
|
|
self.require_all = kwargs.get('require_all', False)
|
|
self._remaining_rules = []
|
|
""" Keeps track of the rules that are next in line to be evaluated
|
|
during the validation of a field.
|
|
Type: :class:`list` """
|
|
|
|
super(BareValidator, self).__init__()
|
|
|
|
@staticmethod
|
|
def __init_error_handler(kwargs):
|
|
error_handler = kwargs.pop('error_handler', errors.BasicErrorHandler)
|
|
if isinstance(error_handler, tuple):
|
|
error_handler, eh_config = error_handler
|
|
else:
|
|
eh_config = {}
|
|
if isinstance(error_handler, type) and issubclass(
|
|
error_handler, errors.BaseErrorHandler
|
|
):
|
|
return error_handler(**eh_config)
|
|
elif isinstance(error_handler, errors.BaseErrorHandler):
|
|
return error_handler
|
|
else:
|
|
raise RuntimeError('Invalid error_handler.')
|
|
|
|
def __store_config(self, args, kwargs):
|
|
"""Assign args to kwargs and store configuration."""
|
|
signature = (
|
|
'schema',
|
|
'ignore_none_values',
|
|
'allow_unknown',
|
|
'require_all',
|
|
'purge_unknown',
|
|
'purge_readonly',
|
|
)
|
|
for i, p in enumerate(signature[: len(args)]):
|
|
if p in kwargs:
|
|
raise TypeError("__init__ got multiple values for argument " "'%s'" % p)
|
|
else:
|
|
kwargs[p] = args[i]
|
|
self._config = kwargs
|
|
""" This dictionary holds the configuration arguments that were used to
|
|
initialize the :class:`Validator` instance except the
|
|
``error_handler``. """
|
|
|
|
@classmethod
|
|
def clear_caches(cls):
|
|
"""Purge the cache of known valid schemas."""
|
|
cls._valid_schemas.clear()
|
|
|
|
def _error(self, *args):
|
|
"""
|
|
Creates and adds one or multiple errors.
|
|
|
|
:param args: Accepts different argument's signatures.
|
|
|
|
*1. Bulk addition of errors:*
|
|
|
|
- :term:`iterable` of
|
|
:class:`~cerberus.errors.ValidationError`-instances
|
|
|
|
The errors will be added to
|
|
:attr:`~cerberus.Validator._errors`.
|
|
|
|
*2. Custom error:*
|
|
|
|
- the invalid field's name
|
|
|
|
- the error message
|
|
|
|
A custom error containing the message will be created and
|
|
added to :attr:`~cerberus.Validator._errors`.
|
|
There will however be fewer information contained in the
|
|
error (no reference to the violated rule and its
|
|
constraint).
|
|
|
|
*3. Defined error:*
|
|
|
|
- the invalid field's name
|
|
|
|
- the error-reference, see :mod:`cerberus.errors`
|
|
|
|
- arbitrary, supplemental information about the error
|
|
|
|
A :class:`~cerberus.errors.ValidationError` instance will
|
|
be created and added to
|
|
:attr:`~cerberus.Validator._errors`.
|
|
"""
|
|
if len(args) == 1:
|
|
self._errors.extend(args[0])
|
|
self._errors.sort()
|
|
for error in args[0]:
|
|
self.document_error_tree.add(error)
|
|
self.schema_error_tree.add(error)
|
|
self.error_handler.emit(error)
|
|
elif len(args) == 2 and isinstance(args[1], _str_type):
|
|
self._error(args[0], errors.CUSTOM, args[1])
|
|
elif len(args) >= 2:
|
|
field = args[0]
|
|
code = args[1].code
|
|
rule = args[1].rule
|
|
info = args[2:]
|
|
|
|
document_path = self.document_path + (field,)
|
|
|
|
schema_path = self.schema_path
|
|
if code != errors.UNKNOWN_FIELD.code and rule is not None:
|
|
schema_path += (field, rule)
|
|
|
|
if not rule:
|
|
constraint = None
|
|
else:
|
|
rules_set = self._resolve_rules_set(
|
|
self._resolve_schema(self.schema)[field]
|
|
)
|
|
if rule == 'nullable':
|
|
constraint = rules_set.get(rule, False)
|
|
elif rule == 'required':
|
|
constraint = rules_set.get(rule, self.require_all)
|
|
if rule not in rules_set:
|
|
schema_path = "__require_all__"
|
|
else:
|
|
constraint = rules_set[rule]
|
|
|
|
value = self.document.get(field)
|
|
|
|
self.recent_error = errors.ValidationError(
|
|
document_path, schema_path, code, rule, constraint, value, info
|
|
)
|
|
self._error([self.recent_error])
|
|
|
|
def _get_child_validator(self, document_crumb=None, schema_crumb=None, **kwargs):
|
|
"""
|
|
Creates a new instance of Validator-(sub-)class. All initial parameters of the
|
|
parent are passed to the initialization, unless a parameter is given as an
|
|
explicit *keyword*-parameter.
|
|
|
|
:param document_crumb: Extends the
|
|
:attr:`~cerberus.Validator.document_path`
|
|
of the child-validator.
|
|
:type document_crumb: :class:`tuple` or :term:`hashable`
|
|
:param schema_crumb: Extends the
|
|
:attr:`~cerberus.Validator.schema_path`
|
|
of the child-validator.
|
|
:type schema_crumb: :class:`tuple` or hashable
|
|
:param kwargs: Overriding keyword-arguments for initialization.
|
|
:type kwargs: :class:`dict`
|
|
|
|
:return: an instance of ``self.__class__``
|
|
"""
|
|
child_config = self._config.copy()
|
|
child_config.update(kwargs)
|
|
if not self.is_child:
|
|
child_config['is_child'] = True
|
|
child_config['error_handler'] = toy_error_handler
|
|
child_config['root_allow_unknown'] = self.allow_unknown
|
|
child_config['root_require_all'] = self.require_all
|
|
child_config['root_document'] = self.document
|
|
child_config['root_schema'] = self.schema
|
|
|
|
child_validator = self.__class__(**child_config)
|
|
|
|
if document_crumb is None:
|
|
child_validator.document_path = self.document_path
|
|
else:
|
|
if not isinstance(document_crumb, tuple):
|
|
document_crumb = (document_crumb,)
|
|
child_validator.document_path = self.document_path + document_crumb
|
|
|
|
if schema_crumb is None:
|
|
child_validator.schema_path = self.schema_path
|
|
else:
|
|
if not isinstance(schema_crumb, tuple):
|
|
schema_crumb = (schema_crumb,)
|
|
child_validator.schema_path = self.schema_path + schema_crumb
|
|
|
|
return child_validator
|
|
|
|
def __get_rule_handler(self, domain, rule):
|
|
methodname = '_{0}_{1}'.format(domain, rule.replace(' ', '_'))
|
|
result = getattr(self, methodname, None)
|
|
if result is None:
|
|
raise RuntimeError(
|
|
"There's no handler for '{}' in the '{}' "
|
|
"domain.".format(rule, domain)
|
|
)
|
|
return result
|
|
|
|
def _drop_nodes_from_errorpaths(self, _errors, dp_items, sp_items):
|
|
"""
|
|
Removes nodes by index from an errorpath, relatively to the basepaths of self.
|
|
|
|
:param errors: A list of :class:`errors.ValidationError` instances.
|
|
:param dp_items: A list of integers, pointing at the nodes to drop from
|
|
the :attr:`document_path`.
|
|
:param sp_items: Alike ``dp_items``, but for :attr:`schema_path`.
|
|
"""
|
|
dp_basedepth = len(self.document_path)
|
|
sp_basedepth = len(self.schema_path)
|
|
for error in _errors:
|
|
for i in sorted(dp_items, reverse=True):
|
|
error.document_path = drop_item_from_tuple(
|
|
error.document_path, dp_basedepth + i
|
|
)
|
|
for i in sorted(sp_items, reverse=True):
|
|
error.schema_path = drop_item_from_tuple(
|
|
error.schema_path, sp_basedepth + i
|
|
)
|
|
if error.child_errors:
|
|
self._drop_nodes_from_errorpaths(error.child_errors, dp_items, sp_items)
|
|
|
|
def _lookup_field(self, path):
|
|
"""
|
|
Searches for a field as defined by path. This method is used by the
|
|
``dependency`` evaluation logic.
|
|
|
|
:param path: Path elements are separated by a ``.``. A leading ``^``
|
|
indicates that the path relates to the document root,
|
|
otherwise it relates to the currently evaluated document,
|
|
which is possibly a subdocument.
|
|
The sequence ``^^`` at the start will be interpreted as a
|
|
literal ``^``.
|
|
:type path: :class:`str`
|
|
:returns: Either the found field name and its value or :obj:`None` for
|
|
both.
|
|
:rtype: A two-value :class:`tuple`.
|
|
"""
|
|
if path.startswith('^'):
|
|
path = path[1:]
|
|
context = self.document if path.startswith('^') else self.root_document
|
|
else:
|
|
context = self.document
|
|
|
|
parts = path.split('.')
|
|
for part in parts:
|
|
if part not in context:
|
|
return None, None
|
|
context = context.get(part, {})
|
|
|
|
return parts[-1], context
|
|
|
|
def _resolve_rules_set(self, rules_set):
|
|
if isinstance(rules_set, Mapping):
|
|
return rules_set
|
|
elif isinstance(rules_set, _str_type):
|
|
return self.rules_set_registry.get(rules_set)
|
|
return None
|
|
|
|
def _resolve_schema(self, schema):
|
|
if isinstance(schema, Mapping):
|
|
return schema
|
|
elif isinstance(schema, _str_type):
|
|
return self.schema_registry.get(schema)
|
|
return None
|
|
|
|
# Properties
|
|
|
|
@property
|
|
def allow_unknown(self):
|
|
"""
|
|
If ``True`` unknown fields that are not defined in the schema will be ignored.
|
|
If a mapping with a validation schema is given, any undefined field will be
|
|
validated against its rules. Also see :ref:`allowing-the-unknown`.
|
|
Type: :class:`bool` or any :term:`mapping`
|
|
"""
|
|
return self._config.get('allow_unknown', False)
|
|
|
|
@allow_unknown.setter
|
|
def allow_unknown(self, value):
|
|
if not (self.is_child or isinstance(value, (bool, DefinitionSchema))):
|
|
DefinitionSchema(self, {'allow_unknown': value})
|
|
self._config['allow_unknown'] = value
|
|
|
|
@property
|
|
def require_all(self):
|
|
"""
|
|
If ``True`` known fields that are defined in the schema will be required.
|
|
Type: :class:`bool`
|
|
"""
|
|
return self._config.get('require_all', False)
|
|
|
|
@require_all.setter
|
|
def require_all(self, value):
|
|
self._config['require_all'] = value
|
|
|
|
@property
|
|
def errors(self):
|
|
"""
|
|
The errors of the last processing formatted by the handler that is bound to
|
|
:attr:`~cerberus.Validator.error_handler`.
|
|
"""
|
|
return self.error_handler(self._errors)
|
|
|
|
@property
|
|
def ignore_none_values(self):
|
|
"""
|
|
Whether to not process :obj:`None`-values in a document or not.
|
|
Type: :class:`bool`
|
|
"""
|
|
return self._config.get('ignore_none_values', False)
|
|
|
|
@ignore_none_values.setter
|
|
def ignore_none_values(self, value):
|
|
self._config['ignore_none_values'] = value
|
|
|
|
@property
|
|
def is_child(self):
|
|
"""
|
|
``True`` for child-validators obtained with
|
|
:meth:`~cerberus.Validator._get_child_validator`.
|
|
Type: :class:`bool`
|
|
"""
|
|
return self._config.get('is_child', False)
|
|
|
|
@property
|
|
def _is_normalized(self):
|
|
"""``True`` if the document is already normalized."""
|
|
return self._config.get('_is_normalized', False)
|
|
|
|
@_is_normalized.setter
|
|
def _is_normalized(self, value):
|
|
self._config['_is_normalized'] = value
|
|
|
|
@property
|
|
def purge_unknown(self):
|
|
"""
|
|
If ``True``, unknown fields will be deleted from the document unless a
|
|
validation is called with disabled normalization. Also see
|
|
:ref:`purging-unknown-fields`.
|
|
Type: :class:`bool`
|
|
"""
|
|
return self._config.get('purge_unknown', False)
|
|
|
|
@purge_unknown.setter
|
|
def purge_unknown(self, value):
|
|
self._config['purge_unknown'] = value
|
|
|
|
@property
|
|
def purge_readonly(self):
|
|
"""
|
|
If ``True``, fields declared as readonly will be deleted from the document
|
|
unless a validation is called with disabled normalization.
|
|
Type: :class:`bool`
|
|
"""
|
|
return self._config.get('purge_readonly', False)
|
|
|
|
@purge_readonly.setter
|
|
def purge_readonly(self, value):
|
|
self._config['purge_readonly'] = value
|
|
|
|
@property
|
|
def root_allow_unknown(self):
|
|
"""
|
|
The :attr:`~cerberus.Validator.allow_unknown` attribute of the first level
|
|
ancestor of a child validator.
|
|
"""
|
|
return self._config.get('root_allow_unknown', self.allow_unknown)
|
|
|
|
@property
|
|
def root_require_all(self):
|
|
"""
|
|
The :attr:`~cerberus.Validator.require_all` attribute of the first level
|
|
ancestor of a child validator.
|
|
"""
|
|
return self._config.get('root_require_all', self.require_all)
|
|
|
|
@property
|
|
def root_document(self):
|
|
"""
|
|
The :attr:`~cerberus.Validator.document` attribute of the first level ancestor
|
|
of a child validator.
|
|
"""
|
|
return self._config.get('root_document', self.document)
|
|
|
|
@property
|
|
def rules_set_registry(self):
|
|
"""
|
|
The registry that holds referenced rules sets.
|
|
Type: :class:`~cerberus.Registry`
|
|
"""
|
|
return self._config.get('rules_set_registry', rules_set_registry)
|
|
|
|
@rules_set_registry.setter
|
|
def rules_set_registry(self, registry):
|
|
self._config['rules_set_registry'] = registry
|
|
|
|
@property
|
|
def root_schema(self):
|
|
"""
|
|
The :attr:`~cerberus.Validator.schema` attribute of the first level ancestor of
|
|
a child validator.
|
|
"""
|
|
return self._config.get('root_schema', self.schema)
|
|
|
|
@property
|
|
def schema(self):
|
|
"""
|
|
The validation schema of a validator. When a schema is passed to a method, it
|
|
replaces this attribute.
|
|
Type: any :term:`mapping` or :obj:`None`
|
|
"""
|
|
return self._schema
|
|
|
|
@schema.setter
|
|
def schema(self, schema):
|
|
if schema is None:
|
|
self._schema = None
|
|
elif self.is_child or isinstance(schema, DefinitionSchema):
|
|
self._schema = schema
|
|
else:
|
|
self._schema = DefinitionSchema(self, schema)
|
|
|
|
@property
|
|
def schema_registry(self):
|
|
"""
|
|
The registry that holds referenced schemas.
|
|
Type: :class:`~cerberus.Registry`
|
|
"""
|
|
return self._config.get('schema_registry', schema_registry)
|
|
|
|
@schema_registry.setter
|
|
def schema_registry(self, registry):
|
|
self._config['schema_registry'] = registry
|
|
|
|
# FIXME the returned method has the correct docstring, but doesn't appear
|
|
# in the API docs
|
|
@readonly_classproperty
|
|
def types(cls):
|
|
"""
|
|
The constraints that can be used for the 'type' rule.
|
|
Type: A tuple of strings.
|
|
"""
|
|
redundant_types = set(cls.types_mapping) & set(cls._types_from_methods)
|
|
if redundant_types:
|
|
warn(
|
|
"These types are defined both with a method and in the"
|
|
"'types_mapping' property of this validator: %s" % redundant_types
|
|
)
|
|
|
|
return tuple(cls.types_mapping) + cls._types_from_methods
|
|
|
|
# Document processing
|
|
|
|
def __init_processing(self, document, schema=None):
|
|
self._errors = errors.ErrorList()
|
|
self.recent_error = None
|
|
self.document_error_tree = errors.DocumentErrorTree()
|
|
self.schema_error_tree = errors.SchemaErrorTree()
|
|
self.document = copy(document)
|
|
if not self.is_child:
|
|
self._is_normalized = False
|
|
|
|
if schema is not None:
|
|
self.schema = DefinitionSchema(self, schema)
|
|
elif self.schema is None:
|
|
if isinstance(self.allow_unknown, Mapping):
|
|
self._schema = {}
|
|
else:
|
|
raise SchemaError(errors.SCHEMA_ERROR_MISSING)
|
|
if document is None:
|
|
raise DocumentError(errors.DOCUMENT_MISSING)
|
|
if not isinstance(document, Mapping):
|
|
raise DocumentError(errors.DOCUMENT_FORMAT.format(document))
|
|
self.error_handler.start(self)
|
|
|
|
def _drop_remaining_rules(self, *rules):
|
|
"""
|
|
Drops rules from the queue of the rules that still need to be evaluated for the
|
|
currently processed field. If no arguments are given, the whole queue is
|
|
emptied.
|
|
"""
|
|
if rules:
|
|
for rule in rules:
|
|
try:
|
|
self._remaining_rules.remove(rule)
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
self._remaining_rules = []
|
|
|
|
# # Normalizing
|
|
|
|
def normalized(self, document, schema=None, always_return_document=False):
|
|
"""
|
|
Returns the document normalized according to the specified rules of a schema.
|
|
|
|
:param document: The document to normalize.
|
|
:type document: any :term:`mapping`
|
|
:param schema: The validation schema. Defaults to :obj:`None`. If not
|
|
provided here, the schema must have been provided at
|
|
class instantiation.
|
|
:type schema: any :term:`mapping`
|
|
:param always_return_document: Return the document, even if an error
|
|
occurred. Defaults to: ``False``.
|
|
:type always_return_document: :class:`bool`
|
|
:return: A normalized copy of the provided mapping or :obj:`None` if an
|
|
error occurred during normalization.
|
|
"""
|
|
self.__init_processing(document, schema)
|
|
self.__normalize_mapping(self.document, self.schema)
|
|
self.error_handler.end(self)
|
|
if self._errors and not always_return_document:
|
|
return None
|
|
else:
|
|
return self.document
|
|
|
|
def __normalize_mapping(self, mapping, schema):
|
|
if isinstance(schema, _str_type):
|
|
schema = self._resolve_schema(schema)
|
|
schema = schema.copy()
|
|
for field in schema:
|
|
schema[field] = self._resolve_rules_set(schema[field])
|
|
|
|
self.__normalize_rename_fields(mapping, schema)
|
|
if self.purge_unknown and not self.allow_unknown:
|
|
self._normalize_purge_unknown(mapping, schema)
|
|
if self.purge_readonly:
|
|
self.__normalize_purge_readonly(mapping, schema)
|
|
# Check `readonly` fields before applying default values because
|
|
# a field's schema definition might contain both `readonly` and
|
|
# `default`.
|
|
self.__validate_readonly_fields(mapping, schema)
|
|
self.__normalize_default_fields(mapping, schema)
|
|
self._normalize_coerce(mapping, schema)
|
|
self.__normalize_containers(mapping, schema)
|
|
self._is_normalized = True
|
|
return mapping
|
|
|
|
def _normalize_coerce(self, mapping, schema):
|
|
"""
|
|
{'oneof': [
|
|
{'type': 'callable'},
|
|
{'type': 'list',
|
|
'schema': {'oneof': [{'type': 'callable'},
|
|
{'type': 'string'}]}},
|
|
{'type': 'string'}
|
|
]}
|
|
"""
|
|
|
|
error = errors.COERCION_FAILED
|
|
for field in mapping:
|
|
if field in schema and 'coerce' in schema[field]:
|
|
mapping[field] = self.__normalize_coerce(
|
|
schema[field]['coerce'],
|
|
field,
|
|
mapping[field],
|
|
schema[field].get('nullable', False),
|
|
error,
|
|
)
|
|
elif (
|
|
isinstance(self.allow_unknown, Mapping)
|
|
and 'coerce' in self.allow_unknown
|
|
):
|
|
mapping[field] = self.__normalize_coerce(
|
|
self.allow_unknown['coerce'],
|
|
field,
|
|
mapping[field],
|
|
self.allow_unknown.get('nullable', False),
|
|
error,
|
|
)
|
|
|
|
def __normalize_coerce(self, processor, field, value, nullable, error):
|
|
if isinstance(processor, _str_type):
|
|
processor = self.__get_rule_handler('normalize_coerce', processor)
|
|
|
|
elif isinstance(processor, Iterable):
|
|
result = value
|
|
for p in processor:
|
|
result = self.__normalize_coerce(p, field, result, nullable, error)
|
|
if (
|
|
errors.COERCION_FAILED
|
|
in self.document_error_tree.fetch_errors_from(
|
|
self.document_path + (field,)
|
|
)
|
|
):
|
|
break
|
|
return result
|
|
|
|
try:
|
|
return processor(value)
|
|
except Exception as e:
|
|
if not (nullable and value is None):
|
|
self._error(field, error, str(e))
|
|
return value
|
|
|
|
def __normalize_containers(self, mapping, schema):
|
|
for field in mapping:
|
|
rules = set(schema.get(field, ()))
|
|
|
|
# TODO: This check conflates validation and normalization
|
|
if isinstance(mapping[field], Mapping):
|
|
if 'keysrules' in rules:
|
|
self.__normalize_mapping_per_keysrules(
|
|
field, mapping, schema[field]['keysrules']
|
|
)
|
|
if 'valuesrules' in rules:
|
|
self.__normalize_mapping_per_valuesrules(
|
|
field, mapping, schema[field]['valuesrules']
|
|
)
|
|
if rules & set(
|
|
('allow_unknown', 'purge_unknown', 'schema')
|
|
) or isinstance(self.allow_unknown, Mapping):
|
|
try:
|
|
self.__normalize_mapping_per_schema(field, mapping, schema)
|
|
except _SchemaRuleTypeError:
|
|
pass
|
|
|
|
elif isinstance(mapping[field], _str_type):
|
|
continue
|
|
|
|
elif isinstance(mapping[field], Sequence):
|
|
if 'schema' in rules:
|
|
self.__normalize_sequence_per_schema(field, mapping, schema)
|
|
elif 'items' in rules:
|
|
self.__normalize_sequence_per_items(field, mapping, schema)
|
|
|
|
def __normalize_mapping_per_keysrules(self, field, mapping, property_rules):
|
|
schema = dict(((k, property_rules) for k in mapping[field]))
|
|
document = dict(((k, k) for k in mapping[field]))
|
|
validator = self._get_child_validator(
|
|
document_crumb=field, schema_crumb=(field, 'keysrules'), schema=schema
|
|
)
|
|
result = validator.normalized(document, always_return_document=True)
|
|
if validator._errors:
|
|
self._drop_nodes_from_errorpaths(validator._errors, [], [2, 4])
|
|
self._error(validator._errors)
|
|
for k in result:
|
|
if k == result[k]:
|
|
continue
|
|
if result[k] in mapping[field]:
|
|
warn(
|
|
"Normalizing keys of {path}: {key} already exists, "
|
|
"its value is replaced.".format(
|
|
path='.'.join(str(x) for x in self.document_path + (field,)),
|
|
key=k,
|
|
)
|
|
)
|
|
mapping[field][result[k]] = mapping[field][k]
|
|
else:
|
|
mapping[field][result[k]] = mapping[field][k]
|
|
del mapping[field][k]
|
|
|
|
def __normalize_mapping_per_valuesrules(self, field, mapping, value_rules):
|
|
schema = dict(((k, value_rules) for k in mapping[field]))
|
|
validator = self._get_child_validator(
|
|
document_crumb=field, schema_crumb=(field, 'valuesrules'), schema=schema
|
|
)
|
|
mapping[field] = validator.normalized(
|
|
mapping[field], always_return_document=True
|
|
)
|
|
if validator._errors:
|
|
self._drop_nodes_from_errorpaths(validator._errors, [], [2])
|
|
self._error(validator._errors)
|
|
|
|
def __normalize_mapping_per_schema(self, field, mapping, schema):
|
|
rules = schema.get(field, {})
|
|
if not rules and isinstance(self.allow_unknown, Mapping):
|
|
rules = self.allow_unknown
|
|
validator = self._get_child_validator(
|
|
document_crumb=field,
|
|
schema_crumb=(field, 'schema'),
|
|
schema=rules.get('schema', {}),
|
|
allow_unknown=rules.get('allow_unknown', self.allow_unknown), # noqa: E501
|
|
purge_unknown=rules.get('purge_unknown', self.purge_unknown),
|
|
require_all=rules.get('require_all', self.require_all),
|
|
) # noqa: E501
|
|
value_type = type(mapping[field])
|
|
result_value = validator.normalized(mapping[field], always_return_document=True)
|
|
mapping[field] = value_type(result_value)
|
|
if validator._errors:
|
|
self._error(validator._errors)
|
|
|
|
def __normalize_sequence_per_schema(self, field, mapping, schema):
|
|
schema = dict(
|
|
((k, schema[field]['schema']) for k in range(len(mapping[field])))
|
|
)
|
|
document = dict((k, v) for k, v in enumerate(mapping[field]))
|
|
validator = self._get_child_validator(
|
|
document_crumb=field, schema_crumb=(field, 'schema'), schema=schema
|
|
)
|
|
value_type = type(mapping[field])
|
|
result = validator.normalized(document, always_return_document=True)
|
|
mapping[field] = value_type(result.values())
|
|
if validator._errors:
|
|
self._drop_nodes_from_errorpaths(validator._errors, [], [2])
|
|
self._error(validator._errors)
|
|
|
|
def __normalize_sequence_per_items(self, field, mapping, schema):
|
|
rules, values = schema[field]['items'], mapping[field]
|
|
if len(rules) != len(values):
|
|
return
|
|
schema = dict(((k, v) for k, v in enumerate(rules)))
|
|
document = dict((k, v) for k, v in enumerate(values))
|
|
validator = self._get_child_validator(
|
|
document_crumb=field, schema_crumb=(field, 'items'), schema=schema
|
|
)
|
|
value_type = type(mapping[field])
|
|
result = validator.normalized(document, always_return_document=True)
|
|
mapping[field] = value_type(result.values())
|
|
if validator._errors:
|
|
self._drop_nodes_from_errorpaths(validator._errors, [], [2])
|
|
self._error(validator._errors)
|
|
|
|
@staticmethod
|
|
def __normalize_purge_readonly(mapping, schema):
|
|
for field in [x for x in mapping if schema.get(x, {}).get('readonly', False)]:
|
|
mapping.pop(field)
|
|
return mapping
|
|
|
|
@staticmethod
|
|
def _normalize_purge_unknown(mapping, schema):
|
|
"""{'type': 'boolean'}"""
|
|
for field in [x for x in mapping if x not in schema]:
|
|
mapping.pop(field)
|
|
return mapping
|
|
|
|
def __normalize_rename_fields(self, mapping, schema):
|
|
for field in tuple(mapping):
|
|
if field in schema:
|
|
self._normalize_rename(mapping, schema, field)
|
|
self._normalize_rename_handler(mapping, schema, field)
|
|
elif (
|
|
isinstance(self.allow_unknown, Mapping)
|
|
and 'rename_handler' in self.allow_unknown
|
|
):
|
|
self._normalize_rename_handler(
|
|
mapping, {field: self.allow_unknown}, field
|
|
)
|
|
return mapping
|
|
|
|
def _normalize_rename(self, mapping, schema, field):
|
|
"""{'type': 'hashable'}"""
|
|
if 'rename' in schema[field]:
|
|
mapping[schema[field]['rename']] = mapping[field]
|
|
del mapping[field]
|
|
|
|
def _normalize_rename_handler(self, mapping, schema, field):
|
|
"""
|
|
{'oneof': [
|
|
{'type': 'callable'},
|
|
{'type': 'list',
|
|
'schema': {'oneof': [{'type': 'callable'},
|
|
{'type': 'string'}]}},
|
|
{'type': 'string'}
|
|
]}
|
|
"""
|
|
if 'rename_handler' not in schema[field]:
|
|
return
|
|
new_name = self.__normalize_coerce(
|
|
schema[field]['rename_handler'], field, field, False, errors.RENAMING_FAILED
|
|
)
|
|
if new_name != field:
|
|
mapping[new_name] = mapping[field]
|
|
del mapping[field]
|
|
|
|
def __validate_readonly_fields(self, mapping, schema):
|
|
for field in (
|
|
x
|
|
for x in schema
|
|
if x in mapping and self._resolve_rules_set(schema[x]).get('readonly')
|
|
):
|
|
self._validate_readonly(schema[field]['readonly'], field, mapping[field])
|
|
|
|
def __normalize_default_fields(self, mapping, schema):
|
|
empty_fields = [
|
|
x
|
|
for x in schema
|
|
if x not in mapping
|
|
or (
|
|
mapping[x] is None # noqa: W503
|
|
and not schema[x].get('nullable', False)
|
|
) # noqa: W503
|
|
]
|
|
|
|
try:
|
|
fields_with_default = [x for x in empty_fields if 'default' in schema[x]]
|
|
except TypeError:
|
|
raise _SchemaRuleTypeError
|
|
for field in fields_with_default:
|
|
self._normalize_default(mapping, schema, field)
|
|
|
|
known_fields_states = set()
|
|
fields_with_default_setter = [
|
|
x for x in empty_fields if 'default_setter' in schema[x]
|
|
]
|
|
while fields_with_default_setter:
|
|
field = fields_with_default_setter.pop(0)
|
|
try:
|
|
self._normalize_default_setter(mapping, schema, field)
|
|
except KeyError:
|
|
fields_with_default_setter.append(field)
|
|
except Exception as e:
|
|
self._error(field, errors.SETTING_DEFAULT_FAILED, str(e))
|
|
|
|
fields_processing_state = hash(tuple(fields_with_default_setter))
|
|
if fields_processing_state in known_fields_states:
|
|
for field in fields_with_default_setter:
|
|
self._error(
|
|
field,
|
|
errors.SETTING_DEFAULT_FAILED,
|
|
'Circular dependencies of default setters.',
|
|
)
|
|
break
|
|
else:
|
|
known_fields_states.add(fields_processing_state)
|
|
|
|
def _normalize_default(self, mapping, schema, field):
|
|
"""{'nullable': True}"""
|
|
mapping[field] = schema[field]['default']
|
|
|
|
def _normalize_default_setter(self, mapping, schema, field):
|
|
"""
|
|
{'oneof': [
|
|
{'type': 'callable'},
|
|
{'type': 'string'}
|
|
]}
|
|
"""
|
|
if 'default_setter' in schema[field]:
|
|
setter = schema[field]['default_setter']
|
|
if isinstance(setter, _str_type):
|
|
setter = self.__get_rule_handler('normalize_default_setter', setter)
|
|
mapping[field] = setter(mapping)
|
|
|
|
# # Validating
|
|
|
|
def validate(self, document, schema=None, update=False, normalize=True):
|
|
"""
|
|
Normalizes and validates a mapping against a validation-schema of defined rules.
|
|
|
|
:param document: The document to normalize.
|
|
:type document: any :term:`mapping`
|
|
:param schema: The validation schema. Defaults to :obj:`None`. If not
|
|
provided here, the schema must have been provided at
|
|
class instantiation.
|
|
:type schema: any :term:`mapping`
|
|
:param update: If ``True``, required fields won't be checked.
|
|
:type update: :class:`bool`
|
|
:param normalize: If ``True``, normalize the document before validation.
|
|
:type normalize: :class:`bool`
|
|
|
|
:return: ``True`` if validation succeeds, otherwise ``False``. Check
|
|
the :func:`errors` property for a list of processing errors.
|
|
:rtype: :class:`bool`
|
|
"""
|
|
self.update = update
|
|
self._unrequired_by_excludes = set()
|
|
|
|
self.__init_processing(document, schema)
|
|
if normalize:
|
|
self.__normalize_mapping(self.document, self.schema)
|
|
|
|
for field in self.document:
|
|
if self.ignore_none_values and self.document[field] is None:
|
|
continue
|
|
definitions = self.schema.get(field)
|
|
if definitions is not None:
|
|
self.__validate_definitions(definitions, field)
|
|
else:
|
|
self.__validate_unknown_fields(field)
|
|
|
|
if not self.update:
|
|
self.__validate_required_fields(self.document)
|
|
|
|
self.error_handler.end(self)
|
|
|
|
return not bool(self._errors)
|
|
|
|
__call__ = validate
|
|
|
|
def validated(self, *args, **kwargs):
|
|
"""
|
|
Wrapper around :meth:`~cerberus.Validator.validate` that returns the normalized
|
|
and validated document or :obj:`None` if validation failed.
|
|
"""
|
|
always_return_document = kwargs.pop('always_return_document', False)
|
|
self.validate(*args, **kwargs)
|
|
if self._errors and not always_return_document:
|
|
return None
|
|
else:
|
|
return self.document
|
|
|
|
def __validate_unknown_fields(self, field):
|
|
if self.allow_unknown:
|
|
value = self.document[field]
|
|
if isinstance(self.allow_unknown, (Mapping, _str_type)):
|
|
# validate that unknown fields matches the schema
|
|
# for unknown_fields
|
|
schema_crumb = 'allow_unknown' if self.is_child else '__allow_unknown__'
|
|
validator = self._get_child_validator(
|
|
schema_crumb=schema_crumb, schema={field: self.allow_unknown}
|
|
)
|
|
if not validator({field: value}, normalize=False):
|
|
self._error(validator._errors)
|
|
else:
|
|
self._error(field, errors.UNKNOWN_FIELD)
|
|
|
|
def __validate_definitions(self, definitions, field):
|
|
"""Validate a field's value against its defined rules."""
|
|
|
|
def validate_rule(rule):
|
|
validator = self.__get_rule_handler('validate', rule)
|
|
return validator(definitions.get(rule, None), field, value)
|
|
|
|
definitions = self._resolve_rules_set(definitions)
|
|
value = self.document[field]
|
|
|
|
rules_queue = [
|
|
x
|
|
for x in self.priority_validations
|
|
if x in definitions or x in self.mandatory_validations
|
|
]
|
|
rules_queue.extend(
|
|
x for x in self.mandatory_validations if x not in rules_queue
|
|
)
|
|
rules_queue.extend(
|
|
x
|
|
for x in definitions
|
|
if x not in rules_queue
|
|
and x not in self.normalization_rules
|
|
and x not in ('allow_unknown', 'require_all', 'meta', 'required')
|
|
)
|
|
self._remaining_rules = rules_queue
|
|
|
|
while self._remaining_rules:
|
|
rule = self._remaining_rules.pop(0)
|
|
try:
|
|
result = validate_rule(rule)
|
|
# TODO remove on next breaking release
|
|
if result:
|
|
break
|
|
except _SchemaRuleTypeError:
|
|
break
|
|
|
|
self._drop_remaining_rules()
|
|
|
|
# Remember to keep the validation methods below this line
|
|
# sorted alphabetically
|
|
|
|
_validate_allow_unknown = dummy_for_rule_validation(
|
|
""" {'oneof': [{'type': 'boolean'},
|
|
{'type': ['dict', 'string'],
|
|
'check_with': 'bulk_schema'}]} """
|
|
)
|
|
|
|
def _validate_allowed(self, allowed_values, field, value):
|
|
"""{'type': 'container'}"""
|
|
if isinstance(value, Iterable) and not isinstance(value, _str_type):
|
|
unallowed = tuple(x for x in value if x not in allowed_values)
|
|
if unallowed:
|
|
self._error(field, errors.UNALLOWED_VALUES, unallowed)
|
|
else:
|
|
if value not in allowed_values:
|
|
self._error(field, errors.UNALLOWED_VALUE, value)
|
|
|
|
def _validate_check_with(self, checks, field, value):
|
|
"""
|
|
{'oneof': [
|
|
{'type': 'callable'},
|
|
{'type': 'list',
|
|
'schema': {'oneof': [{'type': 'callable'},
|
|
{'type': 'string'}]}},
|
|
{'type': 'string'}
|
|
]}
|
|
"""
|
|
if isinstance(checks, _str_type):
|
|
try:
|
|
value_checker = self.__get_rule_handler('check_with', checks)
|
|
# TODO remove on next major release
|
|
except RuntimeError:
|
|
value_checker = self.__get_rule_handler('validator', checks)
|
|
warn(
|
|
"The 'validator' rule was renamed to 'check_with'. Please update "
|
|
"your schema and method names accordingly.",
|
|
DeprecationWarning,
|
|
)
|
|
value_checker(field, value)
|
|
elif isinstance(checks, Iterable):
|
|
for v in checks:
|
|
self._validate_check_with(v, field, value)
|
|
else:
|
|
checks(field, value, self._error)
|
|
|
|
def _validate_contains(self, expected_values, field, value):
|
|
"""{'empty': False }"""
|
|
if not isinstance(value, Iterable):
|
|
return
|
|
|
|
if not isinstance(expected_values, Iterable) or isinstance(
|
|
expected_values, _str_type
|
|
):
|
|
expected_values = set((expected_values,))
|
|
else:
|
|
expected_values = set(expected_values)
|
|
|
|
missing_values = expected_values - set(value)
|
|
if missing_values:
|
|
self._error(field, errors.MISSING_MEMBERS, missing_values)
|
|
|
|
def _validate_dependencies(self, dependencies, field, value):
|
|
"""{'type': ('dict', 'hashable', 'list'), 'check_with': 'dependencies'}"""
|
|
if isinstance(dependencies, _str_type) or not isinstance(
|
|
dependencies, (Iterable, Mapping)
|
|
):
|
|
dependencies = (dependencies,)
|
|
|
|
if isinstance(dependencies, Sequence):
|
|
self.__validate_dependencies_sequence(dependencies, field)
|
|
elif isinstance(dependencies, Mapping):
|
|
self.__validate_dependencies_mapping(dependencies, field)
|
|
|
|
if (
|
|
self.document_error_tree.fetch_node_from(
|
|
self.schema_path + (field, 'dependencies')
|
|
)
|
|
is not None
|
|
):
|
|
return True
|
|
|
|
def __validate_dependencies_mapping(self, dependencies, field):
|
|
validated_dependencies_counter = 0
|
|
error_info = {}
|
|
for dependency_name, dependency_values in dependencies.items():
|
|
if not isinstance(dependency_values, Sequence) or isinstance(
|
|
dependency_values, _str_type
|
|
):
|
|
dependency_values = [dependency_values]
|
|
|
|
wanted_field, wanted_field_value = self._lookup_field(dependency_name)
|
|
if wanted_field_value in dependency_values:
|
|
validated_dependencies_counter += 1
|
|
else:
|
|
error_info.update({dependency_name: wanted_field_value})
|
|
|
|
if validated_dependencies_counter != len(dependencies):
|
|
self._error(field, errors.DEPENDENCIES_FIELD_VALUE, error_info)
|
|
|
|
def __validate_dependencies_sequence(self, dependencies, field):
|
|
for dependency in dependencies:
|
|
if self._lookup_field(dependency)[0] is None:
|
|
self._error(field, errors.DEPENDENCIES_FIELD, dependency)
|
|
|
|
def _validate_empty(self, empty, field, value):
|
|
"""{'type': 'boolean'}"""
|
|
if isinstance(value, Sized) and len(value) == 0:
|
|
self._drop_remaining_rules(
|
|
'allowed',
|
|
'forbidden',
|
|
'items',
|
|
'minlength',
|
|
'maxlength',
|
|
'regex',
|
|
'check_with',
|
|
)
|
|
if not empty:
|
|
self._error(field, errors.EMPTY_NOT_ALLOWED)
|
|
|
|
def _validate_excludes(self, excluded_fields, field, value):
|
|
"""{'type': ('hashable', 'list'), 'schema': {'type': 'hashable'}}"""
|
|
if isinstance(excluded_fields, Hashable):
|
|
excluded_fields = [excluded_fields]
|
|
|
|
# Mark the currently evaluated field as not required for now if it actually is.
|
|
# One of the so marked will be needed to pass when required fields are checked.
|
|
if self.schema[field].get('required', self.require_all):
|
|
self._unrequired_by_excludes.add(field)
|
|
|
|
for excluded_field in excluded_fields:
|
|
if excluded_field in self.schema and self.schema[field].get(
|
|
'required', self.require_all
|
|
):
|
|
self._unrequired_by_excludes.add(excluded_field)
|
|
|
|
if any(excluded_field in self.document for excluded_field in excluded_fields):
|
|
exclusion_str = ', '.join(
|
|
"'{0}'".format(field) for field in excluded_fields
|
|
)
|
|
self._error(field, errors.EXCLUDES_FIELD, exclusion_str)
|
|
|
|
def _validate_forbidden(self, forbidden_values, field, value):
|
|
"""{'type': 'list'}"""
|
|
if isinstance(value, Sequence) and not isinstance(value, _str_type):
|
|
forbidden = set(value) & set(forbidden_values)
|
|
if forbidden:
|
|
self._error(field, errors.FORBIDDEN_VALUES, list(forbidden))
|
|
else:
|
|
if value in forbidden_values:
|
|
self._error(field, errors.FORBIDDEN_VALUE, value)
|
|
|
|
def _validate_items(self, items, field, values):
|
|
"""{'type': 'list', 'check_with': 'items'}"""
|
|
if len(items) != len(values):
|
|
self._error(field, errors.ITEMS_LENGTH, len(items), len(values))
|
|
else:
|
|
schema = dict(
|
|
(i, definition) for i, definition in enumerate(items)
|
|
) # noqa: E501
|
|
validator = self._get_child_validator(
|
|
document_crumb=field,
|
|
schema_crumb=(field, 'items'), # noqa: E501
|
|
schema=schema,
|
|
)
|
|
if not validator(
|
|
dict((i, value) for i, value in enumerate(values)),
|
|
update=self.update,
|
|
normalize=False,
|
|
):
|
|
self._error(field, errors.BAD_ITEMS, validator._errors)
|
|
|
|
def __validate_logical(self, operator, definitions, field, value):
|
|
"""
|
|
Validates value against all definitions and logs errors according to the
|
|
operator.
|
|
"""
|
|
valid_counter = 0
|
|
_errors = errors.ErrorList()
|
|
|
|
for i, definition in enumerate(definitions):
|
|
schema = {field: definition.copy()}
|
|
for rule in ('allow_unknown', 'type'):
|
|
if rule not in schema[field] and rule in self.schema[field]:
|
|
schema[field][rule] = self.schema[field][rule]
|
|
if 'allow_unknown' not in schema[field]:
|
|
schema[field]['allow_unknown'] = self.allow_unknown
|
|
|
|
validator = self._get_child_validator(
|
|
schema_crumb=(field, operator, i), schema=schema, allow_unknown=True
|
|
)
|
|
if validator(self.document, update=self.update, normalize=False):
|
|
valid_counter += 1
|
|
else:
|
|
self._drop_nodes_from_errorpaths(validator._errors, [], [3])
|
|
_errors.extend(validator._errors)
|
|
|
|
return valid_counter, _errors
|
|
|
|
def _validate_anyof(self, definitions, field, value):
|
|
"""{'type': 'list', 'logical': 'anyof'}"""
|
|
valids, _errors = self.__validate_logical('anyof', definitions, field, value)
|
|
if valids < 1:
|
|
self._error(field, errors.ANYOF, _errors, valids, len(definitions))
|
|
|
|
def _validate_allof(self, definitions, field, value):
|
|
"""{'type': 'list', 'logical': 'allof'}"""
|
|
valids, _errors = self.__validate_logical('allof', definitions, field, value)
|
|
if valids < len(definitions):
|
|
self._error(field, errors.ALLOF, _errors, valids, len(definitions))
|
|
|
|
def _validate_noneof(self, definitions, field, value):
|
|
"""{'type': 'list', 'logical': 'noneof'}"""
|
|
valids, _errors = self.__validate_logical('noneof', definitions, field, value)
|
|
if valids > 0:
|
|
self._error(field, errors.NONEOF, _errors, valids, len(definitions))
|
|
|
|
def _validate_oneof(self, definitions, field, value):
|
|
"""{'type': 'list', 'logical': 'oneof'}"""
|
|
valids, _errors = self.__validate_logical('oneof', definitions, field, value)
|
|
if valids != 1:
|
|
self._error(field, errors.ONEOF, _errors, valids, len(definitions))
|
|
|
|
def _validate_max(self, max_value, field, value):
|
|
"""{'nullable': False }"""
|
|
try:
|
|
if value > max_value:
|
|
self._error(field, errors.MAX_VALUE)
|
|
except TypeError:
|
|
pass
|
|
|
|
def _validate_min(self, min_value, field, value):
|
|
"""{'nullable': False }"""
|
|
try:
|
|
if value < min_value:
|
|
self._error(field, errors.MIN_VALUE)
|
|
except TypeError:
|
|
pass
|
|
|
|
def _validate_maxlength(self, max_length, field, value):
|
|
"""{'type': 'integer'}"""
|
|
if isinstance(value, Iterable) and len(value) > max_length:
|
|
self._error(field, errors.MAX_LENGTH, len(value))
|
|
|
|
_validate_meta = dummy_for_rule_validation('')
|
|
|
|
def _validate_minlength(self, min_length, field, value):
|
|
"""{'type': 'integer'}"""
|
|
if isinstance(value, Iterable) and len(value) < min_length:
|
|
self._error(field, errors.MIN_LENGTH, len(value))
|
|
|
|
def _validate_nullable(self, nullable, field, value):
|
|
"""{'type': 'boolean'}"""
|
|
if value is None:
|
|
if not nullable:
|
|
self._error(field, errors.NOT_NULLABLE)
|
|
self._drop_remaining_rules(
|
|
"allof",
|
|
'allowed',
|
|
"anyof",
|
|
'empty',
|
|
'forbidden',
|
|
'items',
|
|
'keysrules',
|
|
'min',
|
|
'max',
|
|
'minlength',
|
|
'maxlength',
|
|
"noneof",
|
|
"oneof",
|
|
'regex',
|
|
'schema',
|
|
'type',
|
|
'valuesrules',
|
|
)
|
|
|
|
def _validate_keysrules(self, schema, field, value):
|
|
"""
|
|
{'type': ['dict', 'string'],
|
|
'check_with': 'bulk_schema',
|
|
'forbidden': ['rename', 'rename_handler']}
|
|
"""
|
|
if isinstance(value, Mapping):
|
|
validator = self._get_child_validator(
|
|
document_crumb=field,
|
|
schema_crumb=(field, 'keysrules'),
|
|
schema=dict(((k, schema) for k in value.keys())),
|
|
)
|
|
if not validator(dict(((k, k) for k in value.keys())), normalize=False):
|
|
self._drop_nodes_from_errorpaths(validator._errors, [], [2, 4])
|
|
self._error(field, errors.KEYSRULES, validator._errors)
|
|
|
|
def _validate_readonly(self, readonly, field, value):
|
|
"""{'type': 'boolean'}"""
|
|
if readonly:
|
|
if not self._is_normalized:
|
|
self._error(field, errors.READONLY_FIELD)
|
|
# If the document was normalized (and therefore already been
|
|
# checked for readonly fields), we still have to return True
|
|
# if an error was filed.
|
|
has_error = (
|
|
errors.READONLY_FIELD
|
|
in self.document_error_tree.fetch_errors_from(
|
|
self.document_path + (field,)
|
|
)
|
|
)
|
|
if self._is_normalized and has_error:
|
|
self._drop_remaining_rules()
|
|
|
|
def _validate_regex(self, pattern, field, value):
|
|
"""{'type': 'string'}"""
|
|
if not isinstance(value, _str_type):
|
|
return
|
|
if not pattern.endswith('$'):
|
|
pattern += '$'
|
|
re_obj = re.compile(pattern)
|
|
if not re_obj.match(value):
|
|
self._error(field, errors.REGEX_MISMATCH)
|
|
|
|
_validate_required = dummy_for_rule_validation(""" {'type': 'boolean'} """)
|
|
|
|
_validate_require_all = dummy_for_rule_validation(""" {'type': 'boolean'} """)
|
|
|
|
def __validate_required_fields(self, document):
|
|
"""
|
|
Validates that required fields are not missing.
|
|
|
|
:param document: The document being validated.
|
|
"""
|
|
try:
|
|
required = set(
|
|
field
|
|
for field, definition in self.schema.items()
|
|
if self._resolve_rules_set(definition).get('required', self.require_all)
|
|
is True
|
|
)
|
|
except AttributeError:
|
|
if self.is_child and self.schema_path[-1] == 'schema':
|
|
raise _SchemaRuleTypeError
|
|
else:
|
|
raise
|
|
required -= self._unrequired_by_excludes
|
|
missing = required - set(
|
|
field
|
|
for field in document
|
|
if document.get(field) is not None or not self.ignore_none_values
|
|
)
|
|
|
|
for field in missing:
|
|
self._error(field, errors.REQUIRED_FIELD)
|
|
|
|
# At least one field from self._unrequired_by_excludes should be present in
|
|
# document.
|
|
if self._unrequired_by_excludes:
|
|
fields = set(field for field in document if document.get(field) is not None)
|
|
if self._unrequired_by_excludes.isdisjoint(fields):
|
|
for field in self._unrequired_by_excludes - fields:
|
|
self._error(field, errors.REQUIRED_FIELD)
|
|
|
|
def _validate_schema(self, schema, field, value):
|
|
"""
|
|
{'type': ['dict', 'string'],
|
|
'anyof': [{'check_with': 'schema'},
|
|
{'check_with': 'bulk_schema'}]}
|
|
"""
|
|
if schema is None:
|
|
return
|
|
|
|
if isinstance(value, Sequence) and not isinstance(value, _str_type):
|
|
self.__validate_schema_sequence(field, schema, value)
|
|
elif isinstance(value, Mapping):
|
|
self.__validate_schema_mapping(field, schema, value)
|
|
|
|
def __validate_schema_mapping(self, field, schema, value):
|
|
schema = self._resolve_schema(schema)
|
|
field_rules = self._resolve_rules_set(self.schema[field])
|
|
validator = self._get_child_validator(
|
|
document_crumb=field,
|
|
schema_crumb=(field, 'schema'),
|
|
schema=schema,
|
|
allow_unknown=field_rules.get('allow_unknown', self.allow_unknown),
|
|
require_all=field_rules.get('require_all', self.require_all),
|
|
)
|
|
try:
|
|
if not validator(value, update=self.update, normalize=False):
|
|
self._error(field, errors.MAPPING_SCHEMA, validator._errors)
|
|
except _SchemaRuleTypeError:
|
|
self._error(field, errors.BAD_TYPE_FOR_SCHEMA)
|
|
raise
|
|
|
|
def __validate_schema_sequence(self, field, schema, value):
|
|
schema = dict(((i, schema) for i in range(len(value))))
|
|
validator = self._get_child_validator(
|
|
document_crumb=field,
|
|
schema_crumb=(field, 'schema'),
|
|
schema=schema,
|
|
allow_unknown=self.allow_unknown,
|
|
)
|
|
validator(
|
|
dict(((i, v) for i, v in enumerate(value))),
|
|
update=self.update,
|
|
normalize=False,
|
|
)
|
|
|
|
if validator._errors:
|
|
self._drop_nodes_from_errorpaths(validator._errors, [], [2])
|
|
self._error(field, errors.SEQUENCE_SCHEMA, validator._errors)
|
|
|
|
def _validate_type(self, data_type, field, value):
|
|
"""
|
|
{'type': ['string', 'list'],
|
|
'check_with': 'type'}
|
|
"""
|
|
if not data_type:
|
|
return
|
|
|
|
types = (data_type,) if isinstance(data_type, _str_type) else data_type
|
|
|
|
for _type in types:
|
|
# TODO remove this block on next major release
|
|
# this implementation still supports custom type validation methods
|
|
type_definition = self.types_mapping.get(_type)
|
|
if type_definition is not None:
|
|
matched = isinstance(
|
|
value, type_definition.included_types
|
|
) and not isinstance(value, type_definition.excluded_types)
|
|
else:
|
|
type_handler = self.__get_rule_handler('validate_type', _type)
|
|
matched = type_handler(value)
|
|
if matched:
|
|
return
|
|
|
|
# TODO uncomment this block on next major release
|
|
# when _validate_type_* methods were deprecated:
|
|
# type_definition = self.types_mapping[_type]
|
|
# if isinstance(value, type_definition.included_types) \
|
|
# and not isinstance(value, type_definition.excluded_types): # noqa 501
|
|
# return
|
|
|
|
self._error(field, errors.BAD_TYPE)
|
|
self._drop_remaining_rules()
|
|
|
|
def _validate_valuesrules(self, schema, field, value):
|
|
"""
|
|
{'type': ['dict', 'string'],
|
|
'check_with': 'bulk_schema',
|
|
'forbidden': ['rename', 'rename_handler']}
|
|
"""
|
|
schema_crumb = (field, 'valuesrules')
|
|
if isinstance(value, Mapping):
|
|
validator = self._get_child_validator(
|
|
document_crumb=field,
|
|
schema_crumb=schema_crumb,
|
|
schema=dict((k, schema) for k in value),
|
|
)
|
|
validator(value, update=self.update, normalize=False)
|
|
if validator._errors:
|
|
self._drop_nodes_from_errorpaths(validator._errors, [], [2])
|
|
self._error(field, errors.VALUESRULES, validator._errors)
|
|
|
|
|
|
RULE_SCHEMA_SEPARATOR = "The rule's arguments are validated against this schema:"
|
|
|
|
|
|
class InspectedValidator(type):
|
|
"""Metaclass for all validators"""
|
|
|
|
def __new__(cls, *args):
|
|
if '__doc__' not in args[2]:
|
|
args[2].update({'__doc__': args[1][0].__doc__})
|
|
return super(InspectedValidator, cls).__new__(cls, *args)
|
|
|
|
def __init__(cls, *args):
|
|
def attributes_with_prefix(prefix):
|
|
return tuple(
|
|
x[len(prefix) + 2 :]
|
|
for x in dir(cls)
|
|
if x.startswith('_' + prefix + '_')
|
|
)
|
|
|
|
super(InspectedValidator, cls).__init__(*args)
|
|
|
|
cls._types_from_methods, cls.validation_rules = (), {}
|
|
for attribute in attributes_with_prefix('validate'):
|
|
# TODO remove inspection of type test methods in next major release
|
|
if attribute.startswith('type_'):
|
|
cls._types_from_methods += (attribute[len('type_') :],)
|
|
else:
|
|
cls.validation_rules[attribute] = cls.__get_rule_schema(
|
|
'_validate_' + attribute
|
|
)
|
|
|
|
# TODO remove on next major release
|
|
if cls._types_from_methods:
|
|
warn(
|
|
"Methods for type testing are deprecated, use TypeDefinition "
|
|
"and the 'types_mapping'-property of a Validator-instance "
|
|
"instead.",
|
|
DeprecationWarning,
|
|
)
|
|
|
|
# TODO remove second summand on next major release
|
|
cls.checkers = tuple(x for x in attributes_with_prefix('check_with')) + tuple(
|
|
x for x in attributes_with_prefix('validator')
|
|
)
|
|
x = cls.validation_rules['check_with']['oneof']
|
|
x[1]['schema']['oneof'][1]['allowed'] = x[2]['allowed'] = cls.checkers
|
|
|
|
for rule in (x for x in cls.mandatory_validations if x != 'nullable'):
|
|
cls.validation_rules[rule]['required'] = True
|
|
|
|
cls.coercers, cls.default_setters, cls.normalization_rules = (), (), {}
|
|
for attribute in attributes_with_prefix('normalize'):
|
|
if attribute.startswith('coerce_'):
|
|
cls.coercers += (attribute[len('coerce_') :],)
|
|
elif attribute.startswith('default_setter_'):
|
|
cls.default_setters += (attribute[len('default_setter_') :],)
|
|
else:
|
|
cls.normalization_rules[attribute] = cls.__get_rule_schema(
|
|
'_normalize_' + attribute
|
|
)
|
|
|
|
for rule in ('coerce', 'rename_handler'):
|
|
x = cls.normalization_rules[rule]['oneof']
|
|
x[1]['schema']['oneof'][1]['allowed'] = x[2]['allowed'] = cls.coercers
|
|
cls.normalization_rules['default_setter']['oneof'][1][
|
|
'allowed'
|
|
] = cls.default_setters
|
|
|
|
cls.rules = {}
|
|
cls.rules.update(cls.validation_rules)
|
|
cls.rules.update(cls.normalization_rules)
|
|
|
|
def __get_rule_schema(cls, method_name):
|
|
docstring = getattr(cls, method_name).__doc__
|
|
if docstring is None:
|
|
result = {}
|
|
else:
|
|
if RULE_SCHEMA_SEPARATOR in docstring:
|
|
docstring = docstring.split(RULE_SCHEMA_SEPARATOR)[1]
|
|
try:
|
|
result = literal_eval(docstring.strip())
|
|
except Exception:
|
|
result = {}
|
|
|
|
if not result and method_name != '_validate_meta':
|
|
warn(
|
|
"No validation schema is defined for the arguments of rule "
|
|
"'%s'" % method_name.split('_', 2)[-1]
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
Validator = InspectedValidator('Validator', (BareValidator,), {})
|