Source code for nti.schema.schema

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Helpers for writing code that implements schemas.

This module contains code based on code originally from dm.zope.schema.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from zope.deferredimport import deprecatedFrom

from zope.interface.interfaces import IInterface
from zope.interface.interfaces import ISpecification
from zope.interface import providedBy
from zope.interface import implementer

from zope.schema.interfaces import IValidatable
from zope.schema.fieldproperty import FieldProperty

from .interfaces import ISchemaConfigured

__docformat__ = "restructuredtext en"


[docs]def schemaitems(spec, _field_key=lambda x: x[1].order): """ schemaitems(spec) -> [(name, field)] The schema part (fields) of interface specification *spec* as a list of (name, field) pairs, in their definition order. """ sd = schemadict(spec) return sorted(sd.items(), key=_field_key)
[docs]def schemadict(spec): """ schemadict(spec) -> dict The schema part (fields) of interface specification *spec* as map from name to field. The return value *must* be treated as immutable. *spec* can be: - A single Interface; - An object implementing ``zope.interfaces.interfaces.ISpecification``, such as that returned by ``providedBy(instance)`` or ``implementedBy(factory)`` (an Interface is a special case of this). - A list, tuple, or iterable of Interfaces. In the first two cases, the results will be cached using the usual interface caching rules. That means that changes to interface bases, or changes to what an object or class provides or implements, will be properly detected. However, if you simply assign a new field to an existing interface, it may not be detected (things like ``Interface.get()`` also fail in that case) unless you call ``Interface.changed()``. .. versionchanged:: 1.15.0 Added caching and re-implemented the schemadict algorithm for speed. The return value must now be treated as immutable. """ try: cache_in = spec._v_attrs # pylint:disable=protected-access except AttributeError: # As of zope.interface 5.0, these are always there, so # this must be just an iterable. pass else: try: return cache_in['__nti_schema_schemadict'] except TypeError: assert cache_in is None cache_in = spec._v_attrs = {} except KeyError: pass # ``zope.schema.getFields`` and ``getFieldsInOrder`` deal with a # single interface, only. So in the past, we handled the latter # two cases (which are the most common cases, especially the # ``providedBy`` case) by constructing a new interface at runtime # using the *spec* as its bases. But that's *really* slow, # especially if the hierarchy is complex. We can do a lot better # if we pay attention to what we're given. # First, boil it down to a list of Interface objects, in resolution order. if IInterface.providedBy(spec): iro = (spec,) elif ISpecification.providedBy(spec): iro = spec.__iro__ else: iro = spec # Next, get the most derived fields. # ``zope.schema.getFields(iface)`` iterates across the interface, # which is the same as calling ``iface.names(all=True)`` (which # returns everything up the hierarchy). It then indexes into the # object to get the attribute. We can save some steps at the cost # of explicit method calls (as opposed to slots) result = {} is_field = IValidatable.providedBy for iface in iro: result.update( (name, attr) for name, attr in iface.namesAndDescriptions() if name not in result and is_field(attr) ) # If we have somewhere to stick a cache, do so. # Note that we don't look up _v_attrs again, just in case it changed # concurrently. try: cache_in['__nti_schema_schemadict'] = result except NameError: pass return result
_marker = object()
[docs]@implementer(ISchemaConfigured) class SchemaConfigured(object): """ Mixin class to provide configuration by the provided schema components. This class is fastest if most of the attributes are represented by ``FieldProperty`` objects. .. versionchanged:: 1.15 Special case ``FieldProperty`` instances found in the type when checking whether a value has been provided. We now assume that if there is no matching item in the dict with the same name, no value was provided. Note that if the schema field contained in the ``FieldProperty`` did something funky in its ``bind()`` method to this object, that will no longer happen at construction time. This can be turned of by setting ``SC_OPTIMIZE_FIELD_PROPERTY`` to false. If you add a FieldProperty to a ``SchemaConfigured`` class after an instance has been created, you must call ``sc_changed``. """ SC_OPTIMIZE_FIELD_PROPERTY = True def __init__(self, **kw): schema = schemadict(self.sc_schema_spec()) for k, v in kw.items(): # might want to control this check if k not in schema: raise TypeError('non schema keyword argument: %s' % k) setattr(self, k, v) # provide default values for schema fields not set. # In bench_schemaconfigured.py, if the fields are FieldProperty objects found in the # type, checking for whether they are set or not took 96% of the total time. # We can be much faster (33us -> 9.1us) if we special case this, without hurting # the non-FieldProperty case too much. if self.SC_OPTIMIZE_FIELD_PROPERTY: schema = self.__elide_fieldproperty(schema) for field_name, schema_field in schema.items(): if field_name in kw: continue # TODO: I think we could do better by first checking # to see if field_name is in vars(type(self))? if getattr(self, field_name, _marker) is _marker: # The point of this is to avoid hiding exceptions (which the builtin # hasattr() does on Python 2) setattr(self, field_name, schema_field.default) __FP_KEY = '__SchemaConfigured_elide_fieldproperty' @classmethod def __elide_fieldproperty(cls, schema): try: matches = cls.__dict__[cls.__FP_KEY] except KeyError: matches = cls.__find_FieldProperty_that_match_schema(schema) setattr(cls, cls.__FP_KEY, matches) return {k: v for k, v in schema.items() if k not in matches} @classmethod def __find_FieldProperty_that_match_schema(cls, schema_dict): result = set() for field_name, schema_field in schema_dict.items(): try: # If these are descriptors, this runs code. We don't look in the # type's __dict__ because we would need to manually walk up the mro(). kind_value = getattr(cls, field_name) except AttributeError: continue # pylint:disable=protected-access if isinstance(kind_value, FieldProperty) \ and kind_value._FieldProperty__field == schema_field: # These are data-descriptors, with both __get__ and # __set__, so they're in full control. They automatically return # the default value of the field (that they have), so we don't # need to copy it down from our schema field. result.add(field_name) return result
[docs] @classmethod def sc_changed(cls, orig_changed=None): """ Call this method if you assign a fieldproperty to this class after creation. """ if cls.__FP_KEY in cls.__dict__: # If this happens concurrently and we hit a super class, that's # fine. try: delattr(cls, cls.__FP_KEY) except AttributeError: # pragma: no cover pass
# provide control over which interfaces define the data schema SC_SCHEMAS = None
[docs] def sc_schema_spec(self): """the schema specification which determines the data schema. This is determined by `SC_SCHEMAS` and defaults to `providedBy(self)`. """ return self.SC_SCHEMAS or providedBy(self)
[docs]class PermissiveSchemaConfigured(SchemaConfigured): """ A mixin subclass of :class:`SchemaConfigured` that allows for extra keywords (those not defined in the schema) to silently be ignored. This is an aid to evolution of code and can be helpful in testing. To allow for one-by-one conversions and updates, this class defines an attribute ``SC_PERMISSIVE``, defaulting to True, that controls this behaviour. """ SC_PERMISSIVE = True def __init__(self, **kwargs): if not self.SC_PERMISSIVE: super(PermissiveSchemaConfigured, self).__init__(**kwargs) else: _schema = schemadict(self.sc_schema_spec()) kwargs = {k: kwargs[k] for k in kwargs if k in _schema} super(PermissiveSchemaConfigured, self).__init__(**kwargs)
deprecatedFrom("Moved to nti.schema.eqhash", "nti.schema.eqhash", 'EqHash', '_superhash')