blob: c0d7dc422c47e037dd2d757b9484389da3e2cfef [file] [log] [blame]
# Copyright (c) The PyAMF Project.
# See LICENSE.txt for details.
"""
Class alias base functionality.
@since: 0.6
"""
import inspect
import pyamf
from pyamf import python, util
class UnknownClassAlias(Exception):
"""
Raised if the AMF stream specifies an Actionscript class that does not
have a Python class alias.
@see: L{register_class}
"""
class ClassAlias(object):
"""
Class alias. Provides class/instance meta data to the En/Decoder to allow
fine grain control and some performance increases.
"""
def __init__(self, klass, alias=None, **kwargs):
if not isinstance(klass, python.class_types):
raise TypeError('klass must be a class type, got %r' % type(klass))
self.checkClass(klass)
self.klass = klass
self.alias = alias
if hasattr(self.alias, 'decode'):
self.alias = self.alias.decode('utf-8')
self.static_attrs = kwargs.pop('static_attrs', None)
self.exclude_attrs = kwargs.pop('exclude_attrs', None)
self.readonly_attrs = kwargs.pop('readonly_attrs', None)
self.proxy_attrs = kwargs.pop('proxy_attrs', None)
self.amf3 = kwargs.pop('amf3', None)
self.external = kwargs.pop('external', None)
self.dynamic = kwargs.pop('dynamic', None)
self.synonym_attrs = kwargs.pop('synonym_attrs', {})
self._compiled = False
self.anonymous = False
self.sealed = None
self.bases = None
if self.alias is None:
self.anonymous = True
# we don't set this to None because AMF3 untyped objects have a
# class name of ''
self.alias = ''
else:
if self.alias == '':
raise ValueError('Cannot set class alias as \'\'')
if not kwargs.pop('defer', False):
self.compile()
if kwargs:
raise TypeError('Unexpected keyword arguments %r' % (kwargs,))
def _checkExternal(self):
k = self.klass
if not hasattr(k, '__readamf__'):
raise AttributeError("An externalised class was specified, but"
" no __readamf__ attribute was found for %r" % (k,))
if not hasattr(k, '__writeamf__'):
raise AttributeError("An externalised class was specified, but"
" no __writeamf__ attribute was found for %r" % (k,))
if not hasattr(k.__readamf__, '__call__'):
raise TypeError("%s.__readamf__ must be callable" % (k.__name__,))
if not hasattr(k.__writeamf__, '__call__'):
raise TypeError("%s.__writeamf__ must be callable" % (k.__name__,))
def compile(self):
"""
This compiles the alias into a form that can be of most benefit to the
en/decoder.
"""
if self._compiled:
return
self.decodable_properties = set()
self.encodable_properties = set()
self.inherited_dynamic = None
self.inherited_sealed = None
self.bases = []
self.exclude_attrs = set(self.exclude_attrs or [])
self.readonly_attrs = set(self.readonly_attrs or [])
self.static_attrs = list(self.static_attrs or [])
self.static_attrs_set = set(self.static_attrs)
self.proxy_attrs = set(self.proxy_attrs or [])
self.sealed = util.is_class_sealed(self.klass)
if self.external:
self._checkExternal()
self._finalise_compile()
# this class is external so no more compiling is necessary
return
if hasattr(self.klass, '__slots__'):
self.decodable_properties.update(self.klass.__slots__)
self.encodable_properties.update(self.klass.__slots__)
for k, v in self.klass.__dict__.iteritems():
if not isinstance(v, property):
continue
if v.fget:
self.encodable_properties.update([k])
if v.fset:
self.decodable_properties.update([k])
else:
self.readonly_attrs.update([k])
mro = inspect.getmro(self.klass)[1:]
for c in mro:
self._compile_base_class(c)
self.getCustomProperties()
self._finalise_compile()
def _compile_base_class(self, klass):
if klass is object:
return
try:
alias = pyamf.get_class_alias(klass)
except UnknownClassAlias:
alias = pyamf.register_class(klass)
alias.compile()
self.bases.append((klass, alias))
if alias.exclude_attrs:
self.exclude_attrs.update(alias.exclude_attrs)
if alias.readonly_attrs:
self.readonly_attrs.update(alias.readonly_attrs)
if alias.static_attrs:
self.static_attrs_set.update(alias.static_attrs)
for a in alias.static_attrs:
if a not in self.static_attrs:
self.static_attrs.insert(0, a)
if alias.proxy_attrs:
self.proxy_attrs.update(alias.proxy_attrs)
if alias.encodable_properties:
self.encodable_properties.update(alias.encodable_properties)
if alias.decodable_properties:
self.decodable_properties.update(alias.decodable_properties)
if self.amf3 is None and alias.amf3:
self.amf3 = alias.amf3
if self.dynamic is None and alias.dynamic is not None:
self.inherited_dynamic = alias.dynamic
if alias.sealed is not None:
self.inherited_sealed = alias.sealed
if alias.synonym_attrs:
self.synonym_attrs, x = alias.synonym_attrs.copy(), self.synonym_attrs
self.synonym_attrs.update(x)
def _finalise_compile(self):
if self.dynamic is None:
self.dynamic = True
if self.inherited_dynamic is not None:
if self.inherited_dynamic is False and not self.sealed and self.inherited_sealed:
self.dynamic = True
else:
self.dynamic = self.inherited_dynamic
if self.sealed:
self.dynamic = False
if self.amf3 is None:
self.amf3 = False
if self.external is None:
self.external = False
if self.static_attrs:
self.encodable_properties.update(self.static_attrs)
self.decodable_properties.update(self.static_attrs)
if self.static_attrs:
if self.exclude_attrs:
self.static_attrs_set.difference_update(self.exclude_attrs)
for a in self.static_attrs_set:
if a not in self.static_attrs:
self.static_attrs.remove(a)
if not self.exclude_attrs:
self.exclude_attrs = None
else:
self.encodable_properties.difference_update(self.exclude_attrs)
self.decodable_properties.difference_update(self.exclude_attrs)
if self.exclude_attrs is not None:
self.exclude_attrs = list(self.exclude_attrs)
self.exclude_attrs.sort()
if not self.readonly_attrs:
self.readonly_attrs = None
else:
self.decodable_properties.difference_update(self.readonly_attrs)
if self.readonly_attrs is not None:
self.readonly_attrs = list(self.readonly_attrs)
self.readonly_attrs.sort()
if not self.proxy_attrs:
self.proxy_attrs = None
else:
self.proxy_attrs = list(self.proxy_attrs)
self.proxy_attrs.sort()
if len(self.decodable_properties) == 0:
self.decodable_properties = None
else:
self.decodable_properties = list(self.decodable_properties)
self.decodable_properties.sort()
if len(self.encodable_properties) == 0:
self.encodable_properties = None
else:
self.encodable_properties = list(self.encodable_properties)
self.encodable_properties.sort()
self.non_static_encodable_properties = None
if self.encodable_properties:
self.non_static_encodable_properties = set(self.encodable_properties)
if self.static_attrs:
self.non_static_encodable_properties.difference_update(self.static_attrs)
self.shortcut_encode = True
self.shortcut_decode = True
if (self.encodable_properties or self.static_attrs or
self.exclude_attrs or self.proxy_attrs or self.external or
self.synonym_attrs):
self.shortcut_encode = False
if (self.decodable_properties or self.static_attrs or
self.exclude_attrs or self.readonly_attrs or
not self.dynamic or self.external or self.synonym_attrs):
self.shortcut_decode = False
self.is_dict = False
if issubclass(self.klass, dict) or self.klass is dict:
self.is_dict = True
self._compiled = True
def is_compiled(self):
return self._compiled
def __str__(self):
return self.alias
def __repr__(self):
k = self.__class__
return '<%s.%s alias=%r class=%r @ 0x%x>' % (k.__module__, k.__name__,
self.alias, self.klass, id(self))
def __eq__(self, other):
if isinstance(other, basestring):
return self.alias == other
elif isinstance(other, self.__class__):
return self.klass == other.klass
elif isinstance(other, python.class_types):
return self.klass == other
else:
return False
def __hash__(self):
return id(self)
def checkClass(self, klass):
"""
This function is used to check if the class being aliased fits certain
criteria. The default is to check that C{__new__} is available or the
C{__init__} constructor does not need additional arguments. If this is
the case then L{TypeError} will be raised.
@since: 0.4
"""
# Check for __new__ support.
if hasattr(klass, '__new__') and hasattr(klass.__new__, '__call__'):
# Should be good to go.
return
# Check that the constructor of the class doesn't require any additonal
# arguments.
if not (hasattr(klass, '__init__') and hasattr(klass.__init__, '__call__')):
return
klass_func = klass.__init__.im_func
if not hasattr(klass_func, 'func_code'):
# Can't examine it, assume it's OK.
return
if klass_func.func_defaults:
available_arguments = len(klass_func.func_defaults) + 1
else:
available_arguments = 1
needed_arguments = klass_func.func_code.co_argcount
if available_arguments >= needed_arguments:
# Looks good to me.
return
spec = inspect.getargspec(klass_func)
raise TypeError("__init__ doesn't support additional arguments: %s"
% inspect.formatargspec(*spec))
def getEncodableAttributes(self, obj, codec=None):
"""
Must return a C{dict} of attributes to be encoded, even if its empty.
@param codec: An optional argument that will contain the encoder
instance calling this function.
@since: 0.5
"""
if not self._compiled:
self.compile()
if self.is_dict:
return dict(obj)
if self.shortcut_encode and self.dynamic:
return obj.__dict__.copy()
attrs = {}
if self.static_attrs:
for attr in self.static_attrs:
attrs[attr] = getattr(obj, attr, pyamf.Undefined)
if not self.dynamic:
if self.non_static_encodable_properties:
for attr in self.non_static_encodable_properties:
attrs[attr] = getattr(obj, attr)
return attrs
dynamic_props = util.get_properties(obj)
if not self.shortcut_encode:
dynamic_props = set(dynamic_props)
if self.encodable_properties:
dynamic_props.update(self.encodable_properties)
if self.static_attrs:
dynamic_props.difference_update(self.static_attrs)
if self.exclude_attrs:
dynamic_props.difference_update(self.exclude_attrs)
for attr in dynamic_props:
attrs[attr] = getattr(obj, attr)
if self.proxy_attrs is not None and attrs and codec:
context = codec.context
for k, v in attrs.copy().iteritems():
if k in self.proxy_attrs:
attrs[k] = context.getProxyForObject(v)
if self.synonym_attrs:
missing = object()
for k, v in self.synonym_attrs.iteritems():
value = attrs.pop(k, missing)
if value is missing:
continue
attrs[v] = value
return attrs
def getDecodableAttributes(self, obj, attrs, codec=None):
"""
Returns a dictionary of attributes for C{obj} that has been filtered,
based on the supplied C{attrs}. This allows for fine grain control
over what will finally end up on the object or not.
@param obj: The object that will recieve the attributes.
@param attrs: The C{attrs} dictionary that has been decoded.
@param codec: An optional argument that will contain the decoder
instance calling this function.
@return: A dictionary of attributes that can be applied to C{obj}
@since: 0.5
"""
if not self._compiled:
self.compile()
changed = False
props = set(attrs.keys())
if self.static_attrs:
missing_attrs = self.static_attrs_set.difference(props)
if missing_attrs:
raise AttributeError('Static attributes %r expected '
'when decoding %r' % (missing_attrs, self.klass))
props.difference_update(self.static_attrs)
if not props:
return attrs
if not self.dynamic:
if not self.decodable_properties:
props = set()
else:
props.intersection_update(self.decodable_properties)
changed = True
if self.readonly_attrs:
props.difference_update(self.readonly_attrs)
changed = True
if self.exclude_attrs:
props.difference_update(self.exclude_attrs)
changed = True
if self.proxy_attrs is not None and codec:
context = codec.context
for k in self.proxy_attrs:
try:
v = attrs[k]
except KeyError:
continue
attrs[k] = context.getObjectForProxy(v)
if self.synonym_attrs:
missing = object()
for k, v in self.synonym_attrs.iteritems():
value = attrs.pop(k, missing)
if value is missing:
continue
attrs[v] = value
if not changed:
return attrs
a = {}
[a.__setitem__(p, attrs[p]) for p in props]
return a
def applyAttributes(self, obj, attrs, codec=None):
"""
Applies the collection of attributes C{attrs} to aliased object C{obj}.
Called when decoding reading aliased objects from an AMF byte stream.
Override this to provide fine grain control of application of
attributes to C{obj}.
@param codec: An optional argument that will contain the en/decoder
instance calling this function.
"""
if not self._compiled:
self.compile()
if self.shortcut_decode:
if self.is_dict:
obj.update(attrs)
return
if not self.sealed:
obj.__dict__.update(attrs)
return
else:
attrs = self.getDecodableAttributes(obj, attrs, codec=codec)
util.set_attrs(obj, attrs)
def getCustomProperties(self):
"""
Overrride this to provide known static properties based on the aliased
class.
@since: 0.5
"""
def createInstance(self, codec=None):
"""
Creates an instance of the klass.
@return: Instance of C{self.klass}.
"""
if type(self.klass) is type:
return self.klass.__new__(self.klass)
return self.klass()