Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@app.task(trail=False)
def backup_service(pk):
service = BackupService.objects.get(pk=pk)
service.ensure_init()
service.backup()
service.prune()
@app.task(trail=False)
def heartbeat():
cache.set('celery_heartbeat', time.time())
@app.task(
trail=False, autoretry_for=(Timeout,), retry_backoff=600, retry_backoff_max=3600
)
def perform_update(cls, pk, auto=False):
try:
if cls == "Project":
obj = Project.objects.get(pk=pk)
else:
obj = Component.objects.get(pk=pk)
if not auto or settings.AUTO_UPDATE:
obj.do_update()
else:
obj.update_remote_branch()
except FileParseError:
# This is stored as alert, so we can silently ignore here
return
@app.task(trail=False)
def ping():
return None
@app.task(trail=False)
def notify_monthly():
notify_digest('notify_monthly')
@app.task(trail=False)
def daily_addons():
for addon in Addon.objects.filter(event__event=EVENT_DAILY).iterator():
with transaction.atomic():
addon.addon.daily(addon.component)
@app.task(trail=False)
def cleanup_project(pk):
"""Perform cleanup of project models."""
try:
project = Project.objects.get(pk=pk)
except Project.DoesNotExist:
return
cleanup_sources(project)
@app.task(trail=False)
def cleanup_old_comments():
if not settings.COMMENT_CLEANUP_DAYS:
return
cutoff = timezone.now() - timedelta(days=settings.COMMENT_CLEANUP_DAYS)
Comment.objects.filter(timestamp__lt=cutoff).delete()
@app.task(trail=False)
def memory_optimize():
memory = TranslationMemory()
memory.index.optimize()
@app.task(trail=False, base=Batches, flush_every=1000, flush_interval=300, bind=True)
def update_memory_task(self, *args, **kwargs):
def fixup_strings(data):
result = {}
for key, value in data.items():
if isinstance(value, int):
result[key] = value
else:
result[key] = force_text(value)
return result
data = extract_batch_kwargs(*args, **kwargs)
memory = TranslationMemory()
try:
with memory.writer() as writer:
for item in data: