| # Copyright (c) The PyAMF Project. |
| # See LICENSE.txt for details. |
| |
| """ |
| Class alias base functionality. |
| |
| @since: 0.6 |
| """ |
| |
| import inspect |
| |
| import pyamf |
| from pyamf import python, util |
| |
| |
| class UnknownClassAlias(Exception): |
| """ |
| Raised if the AMF stream specifies an Actionscript class that does not |
| have a Python class alias. |
| |
| @see: L{register_class} |
| """ |
| |
| |
| class ClassAlias(object): |
| """ |
| Class alias. Provides class/instance meta data to the En/Decoder to allow |
| fine grain control and some performance increases. |
| """ |
| |
| def __init__(self, klass, alias=None, **kwargs): |
| if not isinstance(klass, python.class_types): |
| raise TypeError('klass must be a class type, got %r' % type(klass)) |
| |
| self.checkClass(klass) |
| |
| self.klass = klass |
| self.alias = alias |
| |
| if hasattr(self.alias, 'decode'): |
| self.alias = self.alias.decode('utf-8') |
| |
| self.static_attrs = kwargs.pop('static_attrs', None) |
| self.exclude_attrs = kwargs.pop('exclude_attrs', None) |
| self.readonly_attrs = kwargs.pop('readonly_attrs', None) |
| self.proxy_attrs = kwargs.pop('proxy_attrs', None) |
| self.amf3 = kwargs.pop('amf3', None) |
| self.external = kwargs.pop('external', None) |
| self.dynamic = kwargs.pop('dynamic', None) |
| self.synonym_attrs = kwargs.pop('synonym_attrs', {}) |
| |
| self._compiled = False |
| self.anonymous = False |
| self.sealed = None |
| self.bases = None |
| |
| if self.alias is None: |
| self.anonymous = True |
| # we don't set this to None because AMF3 untyped objects have a |
| # class name of '' |
| self.alias = '' |
| else: |
| if self.alias == '': |
| raise ValueError('Cannot set class alias as \'\'') |
| |
| if not kwargs.pop('defer', False): |
| self.compile() |
| |
| if kwargs: |
| raise TypeError('Unexpected keyword arguments %r' % (kwargs,)) |
| |
| def _checkExternal(self): |
| k = self.klass |
| |
| if not hasattr(k, '__readamf__'): |
| raise AttributeError("An externalised class was specified, but" |
| " no __readamf__ attribute was found for %r" % (k,)) |
| |
| if not hasattr(k, '__writeamf__'): |
| raise AttributeError("An externalised class was specified, but" |
| " no __writeamf__ attribute was found for %r" % (k,)) |
| |
| if not hasattr(k.__readamf__, '__call__'): |
| raise TypeError("%s.__readamf__ must be callable" % (k.__name__,)) |
| |
| if not hasattr(k.__writeamf__, '__call__'): |
| raise TypeError("%s.__writeamf__ must be callable" % (k.__name__,)) |
| |
| def compile(self): |
| """ |
| This compiles the alias into a form that can be of most benefit to the |
| en/decoder. |
| """ |
| if self._compiled: |
| return |
| |
| self.decodable_properties = set() |
| self.encodable_properties = set() |
| self.inherited_dynamic = None |
| self.inherited_sealed = None |
| self.bases = [] |
| |
| self.exclude_attrs = set(self.exclude_attrs or []) |
| self.readonly_attrs = set(self.readonly_attrs or []) |
| self.static_attrs = list(self.static_attrs or []) |
| self.static_attrs_set = set(self.static_attrs) |
| self.proxy_attrs = set(self.proxy_attrs or []) |
| |
| self.sealed = util.is_class_sealed(self.klass) |
| |
| if self.external: |
| self._checkExternal() |
| self._finalise_compile() |
| |
| # this class is external so no more compiling is necessary |
| return |
| |
| if hasattr(self.klass, '__slots__'): |
| self.decodable_properties.update(self.klass.__slots__) |
| self.encodable_properties.update(self.klass.__slots__) |
| |
| for k, v in self.klass.__dict__.iteritems(): |
| if not isinstance(v, property): |
| continue |
| |
| if v.fget: |
| self.encodable_properties.update([k]) |
| |
| if v.fset: |
| self.decodable_properties.update([k]) |
| else: |
| self.readonly_attrs.update([k]) |
| |
| mro = inspect.getmro(self.klass)[1:] |
| |
| for c in mro: |
| self._compile_base_class(c) |
| |
| self.getCustomProperties() |
| |
| self._finalise_compile() |
| |
| def _compile_base_class(self, klass): |
| if klass is object: |
| return |
| |
| try: |
| alias = pyamf.get_class_alias(klass) |
| except UnknownClassAlias: |
| alias = pyamf.register_class(klass) |
| |
| alias.compile() |
| |
| self.bases.append((klass, alias)) |
| |
| if alias.exclude_attrs: |
| self.exclude_attrs.update(alias.exclude_attrs) |
| |
| if alias.readonly_attrs: |
| self.readonly_attrs.update(alias.readonly_attrs) |
| |
| if alias.static_attrs: |
| self.static_attrs_set.update(alias.static_attrs) |
| |
| for a in alias.static_attrs: |
| if a not in self.static_attrs: |
| self.static_attrs.insert(0, a) |
| |
| if alias.proxy_attrs: |
| self.proxy_attrs.update(alias.proxy_attrs) |
| |
| if alias.encodable_properties: |
| self.encodable_properties.update(alias.encodable_properties) |
| |
| if alias.decodable_properties: |
| self.decodable_properties.update(alias.decodable_properties) |
| |
| if self.amf3 is None and alias.amf3: |
| self.amf3 = alias.amf3 |
| |
| if self.dynamic is None and alias.dynamic is not None: |
| self.inherited_dynamic = alias.dynamic |
| |
| if alias.sealed is not None: |
| self.inherited_sealed = alias.sealed |
| |
| if alias.synonym_attrs: |
| self.synonym_attrs, x = alias.synonym_attrs.copy(), self.synonym_attrs |
| self.synonym_attrs.update(x) |
| |
| def _finalise_compile(self): |
| if self.dynamic is None: |
| self.dynamic = True |
| |
| if self.inherited_dynamic is not None: |
| if self.inherited_dynamic is False and not self.sealed and self.inherited_sealed: |
| self.dynamic = True |
| else: |
| self.dynamic = self.inherited_dynamic |
| |
| if self.sealed: |
| self.dynamic = False |
| |
| if self.amf3 is None: |
| self.amf3 = False |
| |
| if self.external is None: |
| self.external = False |
| |
| if self.static_attrs: |
| self.encodable_properties.update(self.static_attrs) |
| self.decodable_properties.update(self.static_attrs) |
| |
| if self.static_attrs: |
| if self.exclude_attrs: |
| self.static_attrs_set.difference_update(self.exclude_attrs) |
| |
| for a in self.static_attrs_set: |
| if a not in self.static_attrs: |
| self.static_attrs.remove(a) |
| |
| if not self.exclude_attrs: |
| self.exclude_attrs = None |
| else: |
| self.encodable_properties.difference_update(self.exclude_attrs) |
| self.decodable_properties.difference_update(self.exclude_attrs) |
| |
| if self.exclude_attrs is not None: |
| self.exclude_attrs = list(self.exclude_attrs) |
| self.exclude_attrs.sort() |
| |
| if not self.readonly_attrs: |
| self.readonly_attrs = None |
| else: |
| self.decodable_properties.difference_update(self.readonly_attrs) |
| |
| if self.readonly_attrs is not None: |
| self.readonly_attrs = list(self.readonly_attrs) |
| self.readonly_attrs.sort() |
| |
| if not self.proxy_attrs: |
| self.proxy_attrs = None |
| else: |
| self.proxy_attrs = list(self.proxy_attrs) |
| self.proxy_attrs.sort() |
| |
| if len(self.decodable_properties) == 0: |
| self.decodable_properties = None |
| else: |
| self.decodable_properties = list(self.decodable_properties) |
| self.decodable_properties.sort() |
| |
| if len(self.encodable_properties) == 0: |
| self.encodable_properties = None |
| else: |
| self.encodable_properties = list(self.encodable_properties) |
| self.encodable_properties.sort() |
| |
| self.non_static_encodable_properties = None |
| |
| if self.encodable_properties: |
| self.non_static_encodable_properties = set(self.encodable_properties) |
| |
| if self.static_attrs: |
| self.non_static_encodable_properties.difference_update(self.static_attrs) |
| |
| self.shortcut_encode = True |
| self.shortcut_decode = True |
| |
| if (self.encodable_properties or self.static_attrs or |
| self.exclude_attrs or self.proxy_attrs or self.external or |
| self.synonym_attrs): |
| self.shortcut_encode = False |
| |
| if (self.decodable_properties or self.static_attrs or |
| self.exclude_attrs or self.readonly_attrs or |
| not self.dynamic or self.external or self.synonym_attrs): |
| self.shortcut_decode = False |
| |
| self.is_dict = False |
| |
| if issubclass(self.klass, dict) or self.klass is dict: |
| self.is_dict = True |
| |
| self._compiled = True |
| |
| def is_compiled(self): |
| return self._compiled |
| |
| def __str__(self): |
| return self.alias |
| |
| def __repr__(self): |
| k = self.__class__ |
| |
| return '<%s.%s alias=%r class=%r @ 0x%x>' % (k.__module__, k.__name__, |
| self.alias, self.klass, id(self)) |
| |
| def __eq__(self, other): |
| if isinstance(other, basestring): |
| return self.alias == other |
| elif isinstance(other, self.__class__): |
| return self.klass == other.klass |
| elif isinstance(other, python.class_types): |
| return self.klass == other |
| else: |
| return False |
| |
| def __hash__(self): |
| return id(self) |
| |
| def checkClass(self, klass): |
| """ |
| This function is used to check if the class being aliased fits certain |
| criteria. The default is to check that C{__new__} is available or the |
| C{__init__} constructor does not need additional arguments. If this is |
| the case then L{TypeError} will be raised. |
| |
| @since: 0.4 |
| """ |
| # Check for __new__ support. |
| if hasattr(klass, '__new__') and hasattr(klass.__new__, '__call__'): |
| # Should be good to go. |
| return |
| |
| # Check that the constructor of the class doesn't require any additonal |
| # arguments. |
| if not (hasattr(klass, '__init__') and hasattr(klass.__init__, '__call__')): |
| return |
| |
| klass_func = klass.__init__.im_func |
| |
| if not hasattr(klass_func, 'func_code'): |
| # Can't examine it, assume it's OK. |
| return |
| |
| if klass_func.func_defaults: |
| available_arguments = len(klass_func.func_defaults) + 1 |
| else: |
| available_arguments = 1 |
| |
| needed_arguments = klass_func.func_code.co_argcount |
| |
| if available_arguments >= needed_arguments: |
| # Looks good to me. |
| return |
| |
| spec = inspect.getargspec(klass_func) |
| |
| raise TypeError("__init__ doesn't support additional arguments: %s" |
| % inspect.formatargspec(*spec)) |
| |
| def getEncodableAttributes(self, obj, codec=None): |
| """ |
| Must return a C{dict} of attributes to be encoded, even if its empty. |
| |
| @param codec: An optional argument that will contain the encoder |
| instance calling this function. |
| @since: 0.5 |
| """ |
| if not self._compiled: |
| self.compile() |
| |
| if self.is_dict: |
| return dict(obj) |
| |
| if self.shortcut_encode and self.dynamic: |
| return obj.__dict__.copy() |
| |
| attrs = {} |
| |
| if self.static_attrs: |
| for attr in self.static_attrs: |
| attrs[attr] = getattr(obj, attr, pyamf.Undefined) |
| |
| if not self.dynamic: |
| if self.non_static_encodable_properties: |
| for attr in self.non_static_encodable_properties: |
| attrs[attr] = getattr(obj, attr) |
| |
| return attrs |
| |
| dynamic_props = util.get_properties(obj) |
| |
| if not self.shortcut_encode: |
| dynamic_props = set(dynamic_props) |
| |
| if self.encodable_properties: |
| dynamic_props.update(self.encodable_properties) |
| |
| if self.static_attrs: |
| dynamic_props.difference_update(self.static_attrs) |
| |
| if self.exclude_attrs: |
| dynamic_props.difference_update(self.exclude_attrs) |
| |
| for attr in dynamic_props: |
| attrs[attr] = getattr(obj, attr) |
| |
| if self.proxy_attrs is not None and attrs and codec: |
| context = codec.context |
| |
| for k, v in attrs.copy().iteritems(): |
| if k in self.proxy_attrs: |
| attrs[k] = context.getProxyForObject(v) |
| |
| if self.synonym_attrs: |
| missing = object() |
| |
| for k, v in self.synonym_attrs.iteritems(): |
| value = attrs.pop(k, missing) |
| |
| if value is missing: |
| continue |
| |
| attrs[v] = value |
| |
| return attrs |
| |
| def getDecodableAttributes(self, obj, attrs, codec=None): |
| """ |
| Returns a dictionary of attributes for C{obj} that has been filtered, |
| based on the supplied C{attrs}. This allows for fine grain control |
| over what will finally end up on the object or not. |
| |
| @param obj: The object that will recieve the attributes. |
| @param attrs: The C{attrs} dictionary that has been decoded. |
| @param codec: An optional argument that will contain the decoder |
| instance calling this function. |
| @return: A dictionary of attributes that can be applied to C{obj} |
| @since: 0.5 |
| """ |
| if not self._compiled: |
| self.compile() |
| |
| changed = False |
| |
| props = set(attrs.keys()) |
| |
| if self.static_attrs: |
| missing_attrs = self.static_attrs_set.difference(props) |
| |
| if missing_attrs: |
| raise AttributeError('Static attributes %r expected ' |
| 'when decoding %r' % (missing_attrs, self.klass)) |
| |
| props.difference_update(self.static_attrs) |
| |
| if not props: |
| return attrs |
| |
| if not self.dynamic: |
| if not self.decodable_properties: |
| props = set() |
| else: |
| props.intersection_update(self.decodable_properties) |
| |
| changed = True |
| |
| if self.readonly_attrs: |
| props.difference_update(self.readonly_attrs) |
| changed = True |
| |
| if self.exclude_attrs: |
| props.difference_update(self.exclude_attrs) |
| changed = True |
| |
| if self.proxy_attrs is not None and codec: |
| context = codec.context |
| |
| for k in self.proxy_attrs: |
| try: |
| v = attrs[k] |
| except KeyError: |
| continue |
| |
| attrs[k] = context.getObjectForProxy(v) |
| |
| if self.synonym_attrs: |
| missing = object() |
| |
| for k, v in self.synonym_attrs.iteritems(): |
| value = attrs.pop(k, missing) |
| |
| if value is missing: |
| continue |
| |
| attrs[v] = value |
| |
| if not changed: |
| return attrs |
| |
| a = {} |
| |
| [a.__setitem__(p, attrs[p]) for p in props] |
| |
| return a |
| |
| def applyAttributes(self, obj, attrs, codec=None): |
| """ |
| Applies the collection of attributes C{attrs} to aliased object C{obj}. |
| Called when decoding reading aliased objects from an AMF byte stream. |
| |
| Override this to provide fine grain control of application of |
| attributes to C{obj}. |
| |
| @param codec: An optional argument that will contain the en/decoder |
| instance calling this function. |
| """ |
| if not self._compiled: |
| self.compile() |
| |
| if self.shortcut_decode: |
| if self.is_dict: |
| obj.update(attrs) |
| |
| return |
| |
| if not self.sealed: |
| obj.__dict__.update(attrs) |
| |
| return |
| |
| else: |
| attrs = self.getDecodableAttributes(obj, attrs, codec=codec) |
| |
| util.set_attrs(obj, attrs) |
| |
| def getCustomProperties(self): |
| """ |
| Overrride this to provide known static properties based on the aliased |
| class. |
| |
| @since: 0.5 |
| """ |
| |
| def createInstance(self, codec=None): |
| """ |
| Creates an instance of the klass. |
| |
| @return: Instance of C{self.klass}. |
| """ |
| if type(self.klass) is type: |
| return self.klass.__new__(self.klass) |
| |
| return self.klass() |