Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@app.task(base=ProfiledEventTask, bind=True, max_retries=5, default_retry_delay=1, throws=(CartError,))
def apply_voucher(self, event: Event, voucher: str, cart_id: str=None, locale='en', sales_channel='web') -> None:
"""
Removes a list of items from a user's cart.
:param event: The event ID in question
:param voucher: A voucher code
:param session: Session ID of a guest
"""
with language(locale):
try:
try:
cm = CartManager(event=event, cart_id=cart_id, sales_channel=sales_channel)
cm.apply_voucher(voucher)
cm.commit()
except LockTimeoutException:
self.retry()
except (MaxRetriesExceededError, LockTimeoutException):
@app.task(base=TransactionAwareTask)
def regenerate_organizer_css(organizer_id: int):
organizer = Organizer.objects.get(pk=organizer_id)
with scope(organizer=organizer):
# main.scss
css, checksum = compile_scss(organizer)
fname = 'pub/{}/presale.{}.css'.format(organizer.slug, checksum[:16])
if organizer.settings.get('presale_css_checksum', '') != checksum:
newname = default_storage.save(fname, ContentFile(css.encode('utf-8')))
organizer.settings.set('presale_css_file', newname)
organizer.settings.set('presale_css_checksum', checksum)
# widget.scss
css, checksum = compile_scss(organizer, file='widget.scss', fonts=False)
fname = 'pub/{}/widget.{}.css'.format(organizer.slug, checksum[:16])
if organizer.settings.get('presale_widget_css_checksum', '') != checksum:
@app.task(base=ProfiledEventTask, bind=True, max_retries=5, default_retry_delay=1, throws=(OrderError,))
def perform_order(self, event: Event, payment_provider: str, positions: List[str],
email: str=None, locale: str=None, address: int=None, meta_info: dict=None,
sales_channel: str='web', gift_cards: list=None, shown_total=None):
with language(locale):
try:
try:
return _perform_order(event, payment_provider, positions, email, locale, address, meta_info,
sales_channel, gift_cards, shown_total)
except LockTimeoutException:
self.retry()
except (MaxRetriesExceededError, LockTimeoutException):
raise OrderError(str(error_messages['busy']))
@app.task(base=EventTask, throws=(OrderError,))
def badges_create_pdf(event: Event, fileid: int, positions: List[int]) -> int:
file = CachedFile.objects.get(id=fileid)
pdfcontent = render_pdf(event, OrderPosition.objects.filter(id__in=positions))
file.file.save(cachedfile_name(file, file.filename), ContentFile(pdfcontent.read()))
file.save()
return file.pk
@app.task(base=ProfiledEventTask)
def export(event: Event, shredders: List[str]) -> None:
known_shredders = event.get_data_shredders()
with NamedTemporaryFile() as rawfile:
with ZipFile(rawfile, 'w') as zipfile:
ccode = get_random_string(6)
zipfile.writestr(
'CONFIRM_CODE.txt',
ccode,
)
zipfile.writestr(
'index.json',
json.dumps({
'instance': settings.SITE_URL,
'organizer': event.organizer.slug,
'event': event.slug,
@app.task(base=TransactionAwareProfiledEventTask)
def vouchers_send(event: Event, vouchers: list, subject: str, message: str, recipients: list, user: int) -> None:
vouchers = list(Voucher.objects.filter(id__in=vouchers).order_by('id'))
user = User.objects.get(pk=user)
for r in recipients:
voucher_list = []
for i in range(r['number']):
voucher_list.append(vouchers.pop())
with language(event.settings.locale):
email_context = get_email_context(event=event, name=r.get('name') or '', voucher_list=[v.code for v in voucher_list])
mail(
r['email'],
subject,
LazyI18nString(message),
email_context,
event,
locale=event.settings.locale,
@app.task
@scopes_disabled()
def update_check():
gs = GlobalSettingsObject()
if not gs.settings.update_check_id:
gs.settings.set('update_check_id', uuid.uuid4().hex)
if not gs.settings.update_check_perform:
return
if 'runserver' in sys.argv:
gs.settings.set('update_check_last', now())
gs.settings.set('update_check_result', {
'error': 'development'
})
return
@app.task(base=TransactionAwareTask, bind=True)
def mail_send_task(self, *args, to: List[str], subject: str, body: str, html: str, sender: str,
event: int=None, position: int=None, headers: dict=None, bcc: List[str]=None,
invoices: List[int]=None, order: int=None, attach_tickets=False, user=None,
attach_ical=False) -> bool:
email = CustomEmail(subject, body, sender, to=to, bcc=bcc, headers=headers)
if html is not None:
html_message = SafeMIMEMultipart(_subtype='related', encoding=settings.DEFAULT_CHARSET)
html_with_cid, cid_images = replace_images_with_cid_paths(html)
html_message.attach(SafeMIMEText(html_with_cid, 'html', settings.DEFAULT_CHARSET))
attach_cid_images(html_message, cid_images, verify_ssl=True)
email.attach_alternative(html_message, "multipart/related")
if user:
user = User.objects.get(pk=user)
if event:
@app.task(base=ProfiledTask, bind=True, max_retries=9)
def send_webhook(self, logentry_id: int, action_type: str, webhook_id: int):
# 9 retries with 2**(2*x) timing is roughly 72 hours
with scopes_disabled():
webhook = WebHook.objects.get(id=webhook_id)
with scope(organizer=webhook.organizer):
logentry = LogEntry.all.get(id=logentry_id)
types = get_all_webhook_events()
event_type = types.get(action_type)
if not event_type or not webhook.enabled:
return # Ignore, e.g. plugin not installed
payload = event_type.build_payload(logentry)
t = time.time()
try:
try: