Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@db_task()
def foo(number):
print('foo(%s)' % number)
@db_task()
def send_email(recipient, subject, body, sender):
"""
Sends the given application e-mail.
:return: None
"""
if not mail.init():
logger.error("email failed: SMTP not configured")
msg = EmailMultiAlternatives(subject, body, sender, (recipient, ))
msg.attach_alternative(body, "text/html")
if msg.send() == 0:
logger.error("email failed: %s - '%s'", recipient, subject)
else:
logger.info("email sent: %s - '%s'", recipient, subject)
@db_task()
def load_tasks_async():
with HUEY.lock_task('import-lock'):
load_tasks(GitRepository(
settings.REPOSITORY_ROOT,
url=config.GITLOAD_URL,
branch=config.GITLOAD_BRANCH
))
@db_task()
def update_tasks():
# git refresh
git_root = settings.GIT_ROOT
if not settings.DEBUG:
logger.info("Pulling changes from git")
repo = GitRepository(git_root)
repo.pull()
# find task directories
meta_files = glob(join(git_root, '*', META_FILE))
task_dirs = [dirname(f) for f in meta_files]
logger.info("found %d task directories", len(task_dirs))
for task_dir in task_dirs:
process_dir(task_dir)
# ZIP the tests
@db_task()
def perform_github_import(login_or_name, repository, branch):
logger.info("Importing github repo %s/%s/%s", login_or_name, repository, branch)
Signature.import_from_github_repository(login_or_name, repository, branch)
@db_task(retries=3)
def updateSquareFees(paymentRecord):
'''
The Square Checkout API does not calculate fees immediately, so this task is
called to be asynchronously run 1 minute after the initial transaction, so that
any Invoice or ExpenseItem associated with this transaction also remains accurate.
'''
fees = paymentRecord.netFees
invoice = paymentRecord.invoice
invoice.fees = fees
invoice.save()
invoice.allocateFees()
return fees
@db_task()
def update_grid_cache(pkey):
with HUEY.lock_task('grid-cache-{}'.format(pkey)):
try:
d = Dataset.objects.get(pk=pkey)
d.update_grid_cache()
return 'Updated {} ({!s})'.format(d.name, d.pk)
except Dataset.DoesNotExist:
return 'Dataset did not exist, can not complete task'
except AttributeError:
return 'No update_grid_cache method on this dataset'
@db_task()
def ensure_pages_visited(participant_pks):
"""This is necessary when a wait page is followed by a timeout page.
We can't guarantee the user's browser will properly continue to poll
the wait page and get redirected, so after a grace period we load the page
automatically, to kick off the expiration timer of the timeout page.
"""
from otree.models.participant import Participant
# we used to filter by _index_in_pages, but that is not reliable,
# because of the race condition described above.
unvisited_participants = Participant.objects.filter(pk__in=participant_pks)
for participant in unvisited_participants:
# if the wait page is the first page,
# then _current_form_page_url could be null.
@db_task()
def update_layers(pkey):
with HUEY.lock_task('process-{}'.format(pkey)):
try:
d = Dataset.objects.get(pk=pkey)
d.update_layers()
return 'Processed {} ({!s})'.format(d.name, d.pk)
except Dataset.DoesNotExist:
return 'Dataset did not exist, can not complete task'
except AttributeError:
return 'No update_layers method on this dataset'
@db_task(retries=1000, retry_delay=60)
def create_module(type_id, item_id, force=False):
if not force:
try:
return Module.objects.get(id=item_id)
except Module.DoesNotExist:
pass
try:
module_data = ESI.request(
'get_dogma_dynamic_items_type_id_item_id',
type_id=type_id,
item_id=item_id
).data
except EsiException as e:
logger.exception("Retrieval of stats for module %d failed (status %d)", item_id, e.status)
return