Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for arg in args[1:]:
t2 = arg.type
if t2 == 'METHOD': raise_forgot_parentheses(arg)
t3 = coerce_types(t, t2)
if t3 is None: throw(IncomparableTypesError, t, t2)
t = t3
if t3 in numeric_types and translator.dialect == 'PostgreSQL':
args = list(args)
for i, arg in enumerate(args):
if arg.type is bool:
args[i] = NumericExprMonad(translator, int, [ 'TO_INT', arg.getsql() ])
sql = [ sqlop ] + [ arg.getsql()[0] for arg in args ]
return translator.ExprMonad.new(translator, t, sql)
class FuncSelectMonad(FuncMonad):
func = core.select
def call(monad, queryset):
translator = monad.translator
if not isinstance(queryset, translator.QuerySetMonad): throw(TypeError,
"'select' function expects generator expression, got: {EXPR}")
return queryset
class FuncExistsMonad(FuncMonad):
func = core.exists
def call(monad, arg):
if not isinstance(arg, monad.translator.SetMixin): throw(TypeError,
"'exists' function expects generator expression or collection, got: {EXPR}")
return arg.nonzero()
class FuncDescMonad(FuncMonad):
func = core.desc
def call(monad, expr):
def apply(self, vdb):
entity = vdb.entities[self.entity_name]
if not isinstance(self.attr_names, tuple):
throw(core.MappingError, 'Composite index should be defined as tuple')
attrs = []
for attr_name in self.attr_names:
attrs.append(entity.get_attr(attr_name))
entity.composite_indexes.append(self.attr_names)
if not vdb.vdb_only:
if self.sql:
vdb.schema.add_sql(self.sql)
else:
self.apply_to_schema(vdb, attrs)
func = utils.avg, core.avg
def call(monad, x):
return x.aggregate('AVG')
class FuncDistinctMonad(FuncMonad):
func = utils.distinct, core.distinct
def call(monad, x):
if isinstance(x, SetMixin): return x.call_distinct()
if not isinstance(x, NumericMixin): throw(TypeError)
result = object.__new__(x.__class__)
result.__dict__.update(x.__dict__)
result.forced_distinct = True
return result
class FuncMinMonad(FuncMonad):
func = min, core.min
def call(monad, *args):
if not args: throw(TypeError, 'min() function expected at least one argument')
if len(args) == 1: return args[0].aggregate('MIN')
return minmax(monad, 'MIN', *args)
class FuncMaxMonad(FuncMonad):
func = max, core.max
def call(monad, *args):
if not args: throw(TypeError, 'max() function expected at least one argument')
if len(args) == 1: return args[0].aggregate('MAX')
return minmax(monad, 'MAX', *args)
def minmax(monad, sqlop, *args):
assert len(args) > 1
translator = monad.translator
t = args[0].type
def from_attribute(cls, attr):
name = attr.name
if isinstance(attr, core.PrimaryKey):
attr_class = PrimaryKey
elif isinstance(attr, core.Discriminator):
attr_class = Discriminator
elif isinstance(attr, core.Required):
attr_class = Required
elif isinstance(attr, core.Optional):
attr_class = Optional
elif isinstance(attr, core.Set):
attr_class = Set
else:
assert False
py_type = attr.py_type if not isinstance(attr.py_type, core.EntityMeta) else attr.py_type.__name__
if attr.reverse:
r_entity_name = py_type
r_attr_name = attr.reverse.name
reverse = (r_entity_name, r_attr_name)
def load(cls, dir, filename):
m = cls(dir)
m.name = filename[:-3]
with open(os.path.join(dir, filename), 'r') as file:
src = file.read()
objects = {}
exec(src, objects)
m.dependencies = objects.get('dependencies', None)
if m.dependencies is None:
throw(core.MigrationError, 'Corrupted migration file %r' % filename)
m.operations = objects.get('operations', [])
m.based_on = objects.get('based_on', [])
data_func = objects.get('data_migration', None)
if data_func:
m.data = True
m.data_migration = data_func
return m
result = translator.ExprMonad.new(translator, int, [ 'COUNT', 'ALL' ])
result.aggregated = True
return result
class FuncAbsMonad(FuncMonad):
func = abs
def call(monad, x):
return x.abs()
class FuncSumMonad(FuncMonad):
func = sum, core.sum
def call(monad, x):
return x.aggregate('SUM')
class FuncAvgMonad(FuncMonad):
func = utils.avg, core.avg
def call(monad, x):
return x.aggregate('AVG')
class FuncDistinctMonad(FuncMonad):
func = utils.distinct, core.distinct
def call(monad, x):
if isinstance(x, SetMixin): return x.call_distinct()
if not isinstance(x, NumericMixin): throw(TypeError)
result = object.__new__(x.__class__)
result.__dict__.update(x.__dict__)
result.forced_distinct = True
return result
class FuncMinMonad(FuncMonad):
func = min, core.min
def call(monad, *args):
def apply(self, vdb):
entity = vdb.entities.get(self.entity_name)
attr = self.attr
if entity is None:
throw(core.MigrationError, 'Entity %s does not exist' % self.entity_name)
if attr.name in entity.new_attrs:
throw(core.MigrationError, 'Attribute %s is already defined' % attr.name)
if isinstance(attr, v.Required):
if attr.is_required and attr.provided.initial is None:
throw(core.MigrationError, 'initial option should be specified in case of adding the Required attribute')
attr.initial = attr.provided.initial
if isinstance(attr, v.Discriminator):
if not attr.provided.initial or not attr.initial:
attr.initial = attr.provided.initial = self.entity_name
attr.entity = entity
entity.add_attr(self.attr)
if attr.reverse:
r_entity = vdb.entities.get(attr.py_type)
if r_entity is None:
return
r_attr = r_entity.get_attr(getattr(attr.reverse, 'name', attr.reverse))
if r_attr is None:
return
r_attr.entity = r_entity
def create_tables(self, connection):
provider = self.provider
cursor = connection.cursor()
for obj in self.get_objects_to_create():
base_name = provider.base_name(obj.name)
name = obj.exists(provider, connection, case_sensitive=False)
if name is None:
for op in obj.get_create_sql():
if core.local.debug: core.log_sql(op.sql)
provider.execute(cursor, op.sql)
elif name != base_name:
quote_name = provider.quote_name
n1, n2 = quote_name(obj.name), quote_name(name)
tn1 = obj.typename
throw(core.SchemaError, '%s %s cannot be created, because %s ' \
'(with a different letter case) already exists in the database. ' \
'Try to delete %s first.' % (tn1, n1, n2, n2))
def rollback(provider, connection, cache=None):
core = pony.orm.core
if core.local.debug: core.log_orm('ROLLBACK')
connection.rollback()
if cache is not None: cache.in_transaction = False
def set_transaction_mode(provider, connection, cache):
assert not cache.in_transaction
db_session = cache.db_session
if db_session is not None and db_session.serializable:
cursor = connection.cursor()
sql = 'SET TRANSACTION ISOLATION LEVEL SERIALIZABLE'
if core.local.debug: log_orm(sql)
cursor.execute(sql)
cache.immediate = True
if db_session is not None and (db_session.serializable or db_session.ddl):
cache.in_transaction = True