Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_org_switch(client):
"""Test organisation switching."""
user = User.get(orcid=User.select(fn.COUNT(User.orcid).alias("id_count"), User.orcid).group_by(
User.orcid).having(fn.COUNT(User.orcid) > 1).objects().first().orcid)
user_orgs = UserOrg.select().join(User, on=UserOrg.user).where(User.orcid == user.orcid)
new_org = Organisation.select().where(Organisation.id.not_in([uo.org_id for uo in user_orgs])).first()
UserOrg.create(user=user, org=new_org, affiliations=0)
resp = client.login(user, follow_redirects=True)
assert user.email.encode() in resp.data
assert len(user.org_links) > 1
assert current_user == user
# Nothing changes if it is the same organisation
uo = user.user_orgs.where(UserOrg.org_id == user.organisation_id).first()
resp = client.get(f"/select/user_org/{uo.id}", follow_redirects=True)
assert User.get(user.id).organisation_id == user.organisation_id
assert user.email.encode() in resp.data
# The current org changes if it's a dirrerent org on the list
def search_bots(query):
query = query.lower().strip()
split = query.split(' ')
# easter egg
if query in ('awesome bot', 'great bot', 'superb bot', 'best bot', 'best bot ever'):
return [Bot.by_username('@botlistbot')]
# exact results
where_query = (
(fn.lower(Bot.username).contains(query) |
fn.lower(Bot.name) << split |
fn.lower(Bot.extra) ** query) &
(Bot.revision <= Revision.get_instance().nr &
Bot.approved == True & Bot.disabled == False)
)
results = set(Bot.select().distinct().where(where_query))
# keyword results
keyword_results = Bot.select(Bot).join(Keyword).where(
(fn.lower(Keyword.name) << split) &
(Bot.revision <= Revision.get_instance().nr) &
(Bot.approved == True & Bot.disabled == False)
)
results.update(keyword_results)
# many @usernames
or_clauses = []
for skill in skills_list:
or_clauses.append(Profile.skills.contains(skill))
for location_part in location_part_list:
or_clauses.append(Profile.location.contains(location_part))
if any(position_title_list):
subquery = Position.select(Param('1')).where(Position.profile == Profile.id, Position.title << position_title_list)
or_clauses.append(Clause(SQL('EXISTS'), subquery))
if any(or_clauses):
clauses.append(reduce(operator.or_, or_clauses))
friends = Friend.select(Friend.friend).where(Friend.user == current_user.id).execute()
clauses.append(~(Profile.id << [f.friend.id for f in friends]))
profiles = Profile.select().where(reduce(operator.and_, clauses)).order_by(fn.Rand()).limit(100)
for profile in profiles:
profile.score = 0
for skill in skills_list:
if profile.skills and skill in profile.skills:
profile.score += 10
for part in location_part_list:
if profile.location and part in profile.location:
profile.score += 10
if any(position_title_list):
profile.position_fetch = profile.positions.execute()
for position_title in position_title_list:
if any(position.title == position_title for position in profile.position_fetch):
profile.score += 10
suggested_profiles = sorted(profiles, key=lambda profile: -profile.score)[:2]
def random(config):
"""Get a random aphorism."""
for a in Aphorism.select().order_by(fn.Random()).limit(1):
click.secho('id:%d' % a.id, fg='white')
click.secho('"%s"' % a.aphorism, fg='white', bold=True)
click.secho(' -- %s' % a.author, fg='green')
click.secho("(%s)\n" % a.source, fg='yellow')
def movDespesa(self):
try:
# Query
row = (ContaAPagar.select(peewee.fn.COALESCE(
peewee.fn.SUM(ContaAPagar.valor), 0
)
.alias('valorAPagar'),
peewee.fn.COALESCE(
peewee.fn.SUM(
ContaAPagar.valor_pago)
)
.alias('valorPago'))
.where(ContaAPagar.data_vencimento.between(
self.dataVencimento, self.dataFim))
)
# Salvando resultado
for lista in row:
self.valorAPagar = lista.valorAPagar
self.valorPago = lista.valorPago
def distinct_texts(self):
return self.texts.select(Text.text, fn.Count(Text.text).alias('count'))\
.group_by(Text.text).dicts()
def movEntrada(self):
try:
# Query
row = (ContaAReceber.select(
peewee.fn.COALESCE(
peewee.fn.SUM(ContaAReceber.valor), 0
)
.alias('valorAReceber'),
peewee.fn.COALESCE(
peewee.fn.SUM(
ContaAReceber.valor_recebido), 0
)
.alias('valorRecebido'))
.where(ContaAReceber.data_vencimento.between(
self.dataVencimento, self.dataFim))
)
# Salvando resultado
for lista in row:
self.valorAReceber = lista.valorAReceber
def get_max_id_for_repo_mirror_config():
""" Gets the maximum id for repository mirroring """
return RepoMirrorConfig.select(fn.Max(RepoMirrorConfig.id)).scalar()
def assign_shell_account(team):
acct = SshAccount.select().order_by(fn.Random()).get()
acct.team = team
acct.save()
def movEntrada(self):
try:
# Query
row = (ContaAReceber.select(
peewee.fn.COALESCE(
peewee.fn.SUM(ContaAReceber.valor), 0
)
.alias('valorAReceber'),
peewee.fn.COALESCE(
peewee.fn.SUM(
ContaAReceber.valor_recebido), 0
)
.alias('valorRecebido'))
.where(ContaAReceber.data_vencimento.between(
self.dataVencimento, self.dataFim))
)
# Salvando resultado
for lista in row:
self.valorAReceber = lista.valorAReceber
self.valorRecebido = lista.valorRecebido