Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUpTestData(cls):
o = Organizer.objects.create(name='Dummy', slug='dummy')
cls.event = Event.objects.create(
organizer=o, name='Dummy', slug='dummy',
date_from=now(),
)
cls.item = Item.objects.create(event=cls.event, name='Dummy', default_price=14)
cls.property = Property.objects.create(event=cls.event, name='Size')
cls.value1 = PropertyValue.objects.create(prop=cls.property, value='S')
cls.value2 = PropertyValue.objects.create(prop=cls.property, value='M')
cls.value3 = PropertyValue.objects.create(prop=cls.property, value='L')
cls.variation1 = ItemVariation.objects.create(item=cls.item)
cls.variation1.values.add(cls.value1)
cls.variation2 = ItemVariation.objects.create(item=cls.item)
cls.variation2.values.add(cls.value2)
cls.variation3 = ItemVariation.objects.create(item=cls.item)
cls.variation3.values.add(cls.value3)
def clean_slug(self):
slug = self.cleaned_data['slug']
if Event.objects.filter(slug__iexact=slug, organizer=self.organizer).exists():
raise forms.ValidationError(
self.error_messages['duplicate_slug'],
code='duplicate_slug'
)
return slug
"detailed configuration later."),
required=False
)
team = forms.ModelChoiceField(
label=_("Grant access to team"),
help_text=_("You are allowed to create events under this organizer, however you do not have permission "
"to edit all events under this organizer. Please select one of your existing teams that will"
" be granted access to this event."),
queryset=Team.objects.none(),
required=False,
empty_label=_('Create a new team for this event with me as the only member')
)
class Meta:
model = Event
fields = [
'name',
'slug',
'currency',
'date_from',
'date_to',
'presale_start',
'presale_end',
'location',
'geo_lat',
'geo_lon',
]
field_classes = {
'date_from': SplitDateTimeField,
'date_to': SplitDateTimeField,
'presale_start': SplitDateTimeField,
if not gs.settings.update_check_perform:
return
if 'runserver' in sys.argv:
gs.settings.set('update_check_last', now())
gs.settings.set('update_check_result', {
'error': 'development'
})
return
check_payload = {
'id': gs.settings.get('update_check_id'),
'version': __version__,
'events': {
'total': Event.objects.count(),
'live': Event.objects.filter(live=True).count(),
},
'plugins': [
{
'name': p.module,
'version': p.version
} for p in get_all_plugins()
]
}
try:
r = requests.post('https://pretix.eu/.update_check/', json=check_payload)
gs.settings.set('update_check_last', now())
if r.status_code != 200:
gs.settings.set('update_check_result', {
'error': 'http_error'
})
else:
def refresh_quota_caches():
# Active events
active = LogEntry.objects.using(settings.DATABASE_REPLICA).filter(
datetime__gt=now() - timedelta(days=7)
).order_by().values('event').annotate(
last_activity=Max('datetime')
)
for a in active:
try:
e = Event.objects.using(settings.DATABASE_REPLICA).get(pk=a['event'])
except Event.DoesNotExist:
continue
quotas = e.quotas.filter(
Q(cached_availability_time__isnull=True) |
Q(cached_availability_time__lt=a['last_activity']) |
Q(cached_availability_time__lt=now() - timedelta(hours=2))
).filter(
Q(subevent__isnull=True) |
Q(subevent__date_to__isnull=False, subevent__date_to__gte=now() - timedelta(days=14)) |
Q(subevent__date_from__gte=now() - timedelta(days=14))
)
for q in quotas:
q.availability()
def validate(self, data):
data = super().validate(data)
full_data = self.to_internal_value(self.to_representation(self.instance)) if self.instance else {}
full_data.update(data)
Event.clean_dates(data.get('date_from'), data.get('date_to'))
Event.clean_presale(data.get('presale_start'), data.get('presale_end'))
if full_data.get('has_subevents') and full_data.get('seating_plan'):
raise ValidationError('Event series should not directly be assigned a seating plan.')
return data
def refresh_quota_caches():
# Active events
active = LogEntry.objects.using(settings.DATABASE_REPLICA).filter(
datetime__gt=now() - timedelta(days=7)
).order_by().values('event').annotate(
last_activity=Max('datetime')
)
for a in active:
try:
e = Event.objects.using(settings.DATABASE_REPLICA).get(pk=a['event'])
except Event.DoesNotExist:
continue
quotas = e.quotas.filter(
Q(cached_availability_time__isnull=True) |
Q(cached_availability_time__lt=a['last_activity']) |
Q(cached_availability_time__lt=now() - timedelta(hours=2))
).filter(
Q(subevent__isnull=True) |
Q(subevent__date_to__isnull=False, subevent__date_to__gte=now() - timedelta(days=14)) |
Q(subevent__date_from__gte=now() - timedelta(days=14))
)
for q in quotas:
q.availability()
def preview(event: int, provider: str):
event = Event.objects.get(id=event)
with rolledback_transaction(), language(event.settings.locale):
item = event.items.create(name=_("Sample product"), default_price=42.23,
description=_("Sample product description"))
item2 = event.items.create(name=_("Sample workshop"), default_price=23.40)
from pretix.base.models import Order
order = event.orders.create(status=Order.STATUS_PENDING, datetime=now(),
email='sample@pretix.eu',
locale=event.settings.locale,
expires=now(), code="PREVIEW1234", total=119)
scheme = PERSON_NAME_SCHEMES[event.settings.name_scheme]
sample = {k: str(v) for k, v in scheme['sample'].items()}
p = order.positions.create(item=item, attendee_name_parts=sample, price=item.default_price)
s = event.subevents.first()
else:
required_permission = None
if request.user.is_authenticated:
try:
# If this logic is updated, make sure to also update the logic in pretix/control/middleware.py
assert_session_valid(request)
except SessionInvalid:
return False
except SessionReauthRequired:
return False
perm_holder = (request.auth if isinstance(request.auth, (Device, TeamAPIToken))
else request.user)
if 'event' in request.resolver_match.kwargs and 'organizer' in request.resolver_match.kwargs:
request.event = Event.objects.filter(
slug=request.resolver_match.kwargs['event'],
organizer__slug=request.resolver_match.kwargs['organizer'],
).select_related('organizer').first()
if not request.event or not perm_holder.has_event_permission(request.event.organizer, request.event, request=request):
return False
request.organizer = request.event.organizer
if isinstance(perm_holder, User) and perm_holder.has_active_staff_session(request.session.session_key):
request.eventpermset = SuperuserPermissionSet()
else:
request.eventpermset = perm_holder.get_event_permission_set(request.organizer, request.event)
if required_permission and required_permission not in request.eventpermset:
return False
elif 'organizer' in request.resolver_match.kwargs:
if not request.organizer or not perm_holder.has_organizer_permission(request.organizer, request=request):
def __call__(self, *args, **kwargs):
if 'event_id' in kwargs:
event_id = kwargs.get('event_id')
with scopes_disabled():
event = Event.objects.select_related('organizer').get(pk=event_id)
del kwargs['event_id']
kwargs['event'] = event
elif 'event' in kwargs:
event_id = kwargs.get('event')
with scopes_disabled():
event = Event.objects.select_related('organizer').get(pk=event_id)
kwargs['event'] = event
else:
args = list(args)
event_id = args[0]
with scopes_disabled():
event = Event.objects.select_related('organizer').get(pk=event_id)
args[0] = event
with scope(organizer=event.organizer):
ret = super().__call__(*args, **kwargs)