Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def dispatch(self, request, *args, **kwargs):
# verify crash_id refers to valid crash object
try:
self.crash = Crash.objects.select_related('crash_description').get(pk=self.kwargs.get('pk'))
except Crash.DoesNotExist:
return HttpResponseBadRequest('no such crash')
# verify there is no crash description for that object yet
try:
desc = self.crash.crash_description
return HttpResponseBadRequest('already reported as \"%s\"' % desc.summary)
except ObjectDoesNotExist:
pass
return super(CrashDescriptionFormView, self).dispatch(request, *args, **kwargs)
class SymbolsViewSet(BaseView):
queryset = Symbols.objects.all().order_by('-id')
serializer_class = SymbolsSerializer
def create(self, request, *args, **kwargs):
try:
return super(SymbolsViewSet, self).create(request, *args, **kwargs)
except IntegrityError:
res = {"message": "Duplicate symbol"}
return Response(data=res, status=status.HTTP_409_CONFLICT)
class CrashViewSet(mixins.ListModelMixin,
mixins.RetrieveModelMixin,
viewsets.GenericViewSet):
queryset = Crash.objects.all().order_by('-id')
serializer_class = CrashSerializer
pagination_class = StandardResultsSetPagination
def delete_duplicate_crashes(limit=None):
logger = logging.getLogger('limitation')
full_result = dict(count=0, size=0, elements=[])
if not limit:
preference_key = '__'.join(['Crash', 'duplicate_number'])
limit = gpm[preference_key]
duplicated = Crash.objects.values('signature').annotate(count=Count('signature'))
duplicated = filter(lambda x: x['count'] > limit, duplicated)
logger.info('Duplicated signatures: %r' % duplicated)
for group in duplicated:
qs = Crash.objects.filter(signature=group['signature']).order_by('created')
dup_elements = []
dup_count = qs.count()
while dup_count > limit:
bulk_size = dup_count - limit if dup_count - limit < 1000 else 1000
bulk_ids = qs[:bulk_size].values_list('id', flat=True)
bulk = qs.filter(id__in=bulk_ids)
result = bulk_delete(Crash, bulk)
full_result['count'] += result['count']
full_result['size'] += result['size']
full_result['elements'] += result['elements']
dup_elements += result['elements']
dup_count -= bulk_size
def delete_duplicate_crashes(limit=None):
logger = logging.getLogger('limitation')
full_result = dict(count=0, size=0, elements=[])
if not limit:
preference_key = '__'.join(['Crash', 'duplicate_number'])
limit = gpm[preference_key]
duplicated = Crash.objects.values('signature').annotate(count=Count('signature'))
duplicated = filter(lambda x: x['count'] > limit, duplicated)
logger.info('Duplicated signatures: %r' % duplicated)
for group in duplicated:
qs = Crash.objects.filter(signature=group['signature']).order_by('created')
dup_elements = []
dup_count = qs.count()
while dup_count > limit:
bulk_size = dup_count - limit if dup_count - limit < 1000 else 1000
bulk_ids = qs[:bulk_size].values_list('id', flat=True)
bulk = qs.filter(id__in=bulk_ids)
result = bulk_delete(Crash, bulk)
full_result['count'] += result['count']
full_result['size'] += result['size']
full_result['elements'] += result['elements']
dup_elements += result['elements']
dup_count -= bulk_size
return full_result