Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def contains(monad, x, not_in=False):
translator = monad.translator
for item in monad.items: check_comparable(item, x)
left_sql = x.getsql()
if len(left_sql) == 1:
if not_in: sql = [ 'NOT_IN', left_sql[0], [ item.getsql()[0] for item in monad.items ] ]
else: sql = [ 'IN', left_sql[0], [ item.getsql()[0] for item in monad.items ] ]
elif not_in:
sql = sqland([ sqlor([ [ 'NE', a, b ] for a, b in izip(left_sql, item.getsql()) ]) for item in monad.items ])
else:
sql = sqlor([ sqland([ [ 'EQ', a, b ] for a, b in izip(left_sql, item.getsql()) ]) for item in monad.items ])
return translator.BoolExprMonad(translator, sql)
def getsql(monad, subquery=None):
def call(monad, days=None, seconds=None, microseconds=None, milliseconds=None, minutes=None, hours=None, weeks=None):
translator = monad.translator
args = days, seconds, microseconds, milliseconds, minutes, hours, weeks
for arg, name in izip(args, ('days', 'seconds', 'microseconds', 'milliseconds', 'minutes', 'hours', 'weeks')):
if arg is None: continue
if not isinstance(arg, translator.NumericMixin) or arg.type is not int: throw(TypeError,
"'%s' argument of timedelta(...) function must be of 'int' type. Got: %r" % (name, type2str(arg.type)))
if not isinstance(arg, ConstMonad): throw(NotImplementedError)
value = timedelta(*(arg.value if arg is not None else 0 for arg in args))
return translator.ConstMonad.new(translator, value)
def join_tables(alias1, alias2, columns1, columns2):
assert len(columns1) == len(columns2)
return sqland([ [ 'EQ', [ 'COLUMN', alias1, c1 ], [ 'COLUMN', alias2, c2 ] ] for c1, c2 in izip(columns1, columns2) ])
if locals is None: locals = {}
not_found = object()
old_values = []
all_var_names = elem.var_names + [ 'for' ]
for ass_names, _ in elem.assignments: all_var_names.extend(ass_names)
for name in all_var_names:
old_values.append(locals.get(name, not_found))
result = []
list = _eval(elem.code, globals, locals)
if not list:
if elem.else_: return elem.else_.eval(globals, locals)
return ''
for i, item in enumerate(list):
if i and elem.separator:
result.append(elem.separator.eval(globals, locals))
for name, value in izip(elem.var_names, item): locals[name] = value
locals['for'] = i, len(list)
for _, code in elem.assignments: exec code in globals, locals
result.append(elem.markup.eval(globals, locals))
for name, old_value in izip(all_var_names, old_values):
if old_value is not_found: del locals[name]
else: locals[name] = old_value
return elem.empty.join(result)
def read_phrases(lines):
kstr, lstr_list = None, []
for lineno, line in izip(count(1), lines):
if not line or line.isspace() or line.lstrip().startswith('#'): continue
elif line[0].isspace():
if kstr is None: raise I18nParseError(
"Translation string found but key string was expected in line %d" % lineno)
lstr_list.append((lineno, line))
elif kstr is None: kstr = lineno, line # assert lineno == 1
else:
yield kstr, lstr_list
kstr, lstr_list = (lineno, line), []
if kstr is not None:
yield kstr, lstr_list
t12 = {t1, t2}
if Json in t12 and t12 < {Json, str, unicode, int, bool, float}:
return True
if op in ('in', 'not in'):
if tt2 is RawSQLType: return True
if tt2 is not SetType: return False
op = '=='
t2 = t2.item_type
tt2 = type(t2)
if op in ('is', 'is not'):
return t1 is not None and t2 is NoneType
if tt1 is tuple:
if not tt2 is tuple: return False
if len(t1) != len(t2): return False
for item1, item2 in izip(t1, t2):
if not are_comparable_types(item1, item2): return False
return True
if tt1 is RawSQLType or tt2 is RawSQLType: return True
if op in ('==', '<>', '!='):
if t1 is NoneType and t2 is NoneType: return False
if t1 is NoneType or t2 is NoneType: return True
if t1 in primitive_types:
if t1 is t2: return True
if (t1, t2) in coercions: return True
if tt1 is not type or tt2 is not type: return False
if issubclass(t1, int_types) and issubclass(t2, basestring): return True
if issubclass(t2, int_types) and issubclass(t1, basestring): return True
return False
if tt1.__name__ == tt2.__name__ == 'EntityMeta':
return t1._root_ is t2._root_
return False
def check_params(key_params_list, key_lineno, lstr, lstr_lineno, lang_code):
lstr_params_list = [ match.group() for match in param_re.finditer(lstr)
if match.group() != '$$' ]
if len(key_params_list) != len(lstr_params_list): raise I18nParseError(
"Parameters count in line %d doesn't match with line %d" % (key_lineno, lstr_lineno))
for a, b in izip(sorted(key_params_list), sorted(lstr_params_list)):
if a != b: raise I18nParseError("Unknown parameter in line %d: %s (translation for %s)"
% (lstr_lineno, b, lang_code))
def join_tables(alias1, alias2, columns1, columns2):
assert len(columns1) == len(columns2)
return sqland([ [ 'EQ', [ 'COLUMN', alias1, c1 ], [ 'COLUMN', alias2, c2 ] ] for c1, c2 in izip(columns1, columns2) ])
for item_column, new_name in izip(item_columns, new_names) ]
subquery.from_ast.append([ alias, 'SELECT', subquery_ast[1:], sqland(outer_conditions) ])
if not_in: sql_ast = sqland([ [ 'IS_NULL', [ 'COLUMN', alias, new_name ] ]
for new_name in new_names ])
else: sql_ast = [ 'EQ', [ 'VALUE', 1 ], [ 'VALUE', 1 ] ]
else:
if len(item_columns) == 1:
subquery_ast = sub.shallow_copy_of_subquery_ast(is_not_null_checks=not_in)
sql_ast = [ 'NOT_IN' if not_in else 'IN', item_columns[0], subquery_ast ]
elif translator.row_value_syntax:
subquery_ast = sub.shallow_copy_of_subquery_ast(is_not_null_checks=not_in)
sql_ast = [ 'NOT_IN' if not_in else 'IN', [ 'ROW' ] + item_columns, subquery_ast ]
else:
subquery_ast = sub.shallow_copy_of_subquery_ast()
select_ast, from_ast, where_ast = subquery_ast[1:4]
in_conditions = [ [ 'EQ', expr1, expr2 ] for expr1, expr2 in izip(item_columns, select_ast[1:]) ]
if not sub.aggregated: where_ast += in_conditions
else:
having_ast = find_or_create_having_ast(subquery_ast)
having_ast += in_conditions
sql_ast = [ 'NOT_EXISTS' if not_in else 'EXISTS' ] + subquery_ast[2:]
return translator.BoolExprMonad(translator, sql_ast)
def nonzero(monad):