Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
strings, rather than kept as lazy objects.
If strings_only is True, don't convert (some) non-string-like objects.
"""
# Handle the common case first for performance reasons.
if issubclass(type(s), unicode):
return s
if strings_only and is_protected_type(s):
return s
try:
if not issubclass(type(s), basestring):
if not PY2:
if isinstance(s, bytes):
s = unicode(s, encoding, errors)
else:
s = unicode(s)
elif hasattr(s, '__unicode__'):
s = unicode(s)
else:
s = unicode(bytes(s), encoding, errors)
else:
# Note: We use .decode() here, instead of unicode(s, encoding,
# errors), so that if s is a SafeBytes, it ends up being a
# SafeText at the end.
s = s.decode(encoding, errors)
except UnicodeDecodeError as e:
if isinstance(s, Exception):
# If we get to here, the caller has passed in an Exception
# subclass populated with non-ASCII bytestring data without a
# working unicode method. Try to handle this without raising a
# further exception by individually forcing the exception args
# to unicode.
def func(value):
if value is None:
return None
t = type(value)
if t is not unicode:
if t is buffer:
value = hexlify(value).decode('ascii')
else:
value = unicode(value)
result = base_func(value)
return result
func.__name__ = name
return func
py_upper = make_string_function('py_upper', unicode.upper)
py_lower = make_string_function('py_lower', unicode.lower)
def py_json_unwrap(value):
# [null,some-value] -> some-value
if value is None:
return None
assert value.startswith('[null,'), value
return value[6:-1]
path_cache = {}
json_path_re = re.compile(r'\[(-?\d+)\]|\.(?:(\w+)|"([^"]*)")', re.UNICODE)
def _parse_path(path):
if path in path_cache:
return path_cache[path]
keys = None
return tuple(item_types), tuple(item_values)
if t.__name__ == 'EntityMeta':
return SetType(value), value
if t.__name__ == 'EntityIter':
entity = value.entity
return SetType(entity), entity
if PY2 and isinstance(value, str):
try:
value.decode('ascii')
except UnicodeDecodeError:
throw(TypeError, 'The bytestring %r contains non-ascii symbols. Try to pass unicode string instead' % value)
else:
return unicode, value
elif isinstance(value, unicode):
return unicode, value
if t in function_types:
return FuncType(value), value
if t is types.MethodType:
return MethodType(value), value
if hasattr(value, '_get_type_'):
return value._get_type_(), value
return normalize_type(t), value
try: return text.decode(encoding or getpreferredencoding())
except UnicodeDecodeError:
return text.decode('ascii', 'replace')
def compress(s):
zipped = s.encode('zip')
if len(zipped) < len(s): return 'Z' + zipped
return 'N' + s
def decompress(s):
first = s[0]
if first == 'N': return s[1:]
elif first == 'Z': return s[1:].decode('zip')
raise ValueError('Incorrect data')
class JsonString(unicode): pass
def json_result(obj, **kwargs):
result = JsonString(json.dumps(obj, **kwargs))
result.media_type = 'application/json'
if 'encoding' in kwargs: result.charset = kwargs['encoding']
return result
expr1_re = re.compile(r'''
([A-Za-z_]\w*) # identifier (group 1)
| ([(]) # open parenthesis (group 2)
''', re.VERBOSE)
expr2_re = re.compile(r'''
\s*(?:
(;) # semicolon (group 1)
| (\.\s*[A-Za-z_]\w*) # dot + identifier (group 2)
def are_comparable_types(t1, t2, op='=='):
# types must be normalized already!
tt1 = type(t1)
tt2 = type(t2)
t12 = {t1, t2}
if Json in t12 and t12 < {Json, str, unicode, int, bool, float}:
return True
if op in ('in', 'not in'):
if tt2 is RawSQLType: return True
if tt2 is not SetType: return False
op = '=='
t2 = t2.item_type
tt2 = type(t2)
if op in ('is', 'is not'):
return t1 is not None and t2 is NoneType
if tt1 is tuple:
if not tt2 is tuple: return False
if len(t1) != len(t2): return False
for item1, item2 in izip(t1, t2):
if not are_comparable_types(item1, item2): return False
return True
if tt1 is RawSQLType or tt2 is RawSQLType: return True
def upath(path):
"""
Always return a unicode path.
"""
if PY2 and not isinstance(path, unicode):
fs_encoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
return path.decode(fs_encoding)
return path
def tostring(x):
if isinstance(x, basestring): return x
if hasattr(x, '__unicode__'):
try: return unicode(x)
except: pass
if hasattr(x, 'makeelement'): return cElementTree.tostring(x)
try: return str(x)
except: pass
try: return repr(x)
except: pass
if type(x) == types.InstanceType: return '<%s instance at 0x%X>' % (x.__class__.__name__)
return '<%s object at 0x%X>' % (x.__class__.__name__)
import sys, types, weakref
from decimal import Decimal
from datetime import date, time, datetime, timedelta
from functools import wraps, WRAPPER_ASSIGNMENTS
from uuid import UUID
from pony.utils import throw, parse_expr, deref_proxy
NoneType = type(None)
class LongStr(str):
lazy = True
if PY2:
class LongUnicode(unicode):
lazy = True
else:
LongUnicode = LongStr
class SetType(object):
__slots__ = 'item_type'
def __deepcopy__(self, memo):
return self # SetType instances are "immutable"
def __init__(self, item_type):
self.item_type = item_type
def __eq__(self, other):
return type(other) is SetType and self.item_type == other.item_type
def __ne__(self, other):
return type(other) is not SetType or self.item_type != other.item_type
def __hash__(self):
return hash(self.item_type) + 1
def new(translator, type, paramkey):
type = normalize_type(type)
if type in numeric_types: cls = translator.NumericParamMonad
elif type is unicode: cls = translator.StringParamMonad
elif type is date: cls = translator.DateParamMonad
elif type is time: cls = translator.TimeParamMonad
elif type is timedelta: cls = translator.TimedeltaParamMonad
elif type is datetime: cls = translator.DatetimeParamMonad
elif type is buffer: cls = translator.BufferParamMonad
elif type is UUID: cls = translator.UuidParamMonad
elif isinstance(type, EntityMeta): cls = translator.ObjectParamMonad
else: throw(NotImplementedError, 'Parameter {EXPR} has unsupported type %r' % (type))
result = cls(translator, type, paramkey)
result.aggregated = False
return result
def __new__(cls, *args):
attrs = rattr, attr
result.append(attrs)
return result
pairs = {}
for ename, (aadded, aremoved, amodified) in sorted(emodified.items()):
# Get entity operations
for aname, attr in sorted(aadded.items()):
if not attr.reverse:
if issubclass(attr.py_type, str) and isinstance(attr, orm.Optional):
kwargs = attr._constructor_args[1]
value_class = self.db.provider.sqlbuilder_cls.value_class
value = value_class(self.db.provider.paramstyle, '')
kwargs.update(sql_default=unicode(value))
elif not attr.nullable and attr.initial is None and attr.default is None and not attr.is_pk:
initial = self.questioner.ask_not_null_addition(aname, ename)
attr.initial = attr._constructor_args[1]['initial'] = initial
result.append(ops.AddAttr(ename, aname, attr))
for aname, attr in sorted(aremoved.items()):
if not attr.reverse:
result.append(ops.RemoveAttr(ename, aname))
for aname, attr in sorted(amodified.items()):
if not attr.reverse:
attr_prev = self.db_prev.entities[ename]._adict_[aname]
result.append(ops.AddAttr(ename, aname, attr) if attr_prev.reverse else ops.ModifyAttr(ename, aname, attr))
new_entity = entities.get(ename) or entities.get(entity_renames.get(ename))
assert new_entity is not None
adict = {a.name: a for a in new_entity._new_attrs_}